• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19181077268

07 Nov 2025 09:00PM UTC coverage: 86.352% (-0.5%) from 86.841%
19181077268

Pull #1059

github

web-flow
Merge fb47045e6 into 58a6e4765
Pull Request #1059: fix: patching of session custom resources

89 of 104 new or added lines in 5 files covered. (85.58%)

577 existing lines in 33 files now uncovered.

22791 of 26393 relevant lines covered (86.35%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.26
/components/renku_data_services/crc/db.py
1
"""Adapter based on SQLAlchemy.
2

3
These adapters currently do a few things (1) generate SQL queries, (2) apply resource access controls,
4
(3) fetch the SQL results and (4) format them into a workable representation. In the future and as the code
5
grows it is worth looking into separating this functionality into separate classes rather than having
6
it all in one place.
7
"""
8

9
from asyncio import gather
2✔
10
from collections.abc import AsyncGenerator, Callable, Collection, Coroutine, Sequence
2✔
11
from dataclasses import asdict, dataclass, field
2✔
12
from functools import wraps
2✔
13
from typing import Any, Concatenate, Optional, ParamSpec, TypeVar
2✔
14

15
from sqlalchemy import NullPool, delete, false, select, true
2✔
16
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
2✔
17
from sqlalchemy.orm import selectinload
2✔
18
from sqlalchemy.sql import Select, and_, not_, or_
2✔
19
from ulid import ULID
2✔
20

21
import renku_data_services.base_models as base_models
2✔
22
from renku_data_services import errors
2✔
23
from renku_data_services.base_models import RESET
2✔
24
from renku_data_services.crc import models
2✔
25
from renku_data_services.crc import orm as schemas
2✔
26
from renku_data_services.crc.core import validate_resource_class_update, validate_resource_pool_update
2✔
27
from renku_data_services.crc.models import ClusterPatch, ClusterSettings, SavedClusterSettings, SessionProtocol
2✔
28
from renku_data_services.crc.orm import ClusterORM
2✔
29
from renku_data_services.k8s.db import QuotaRepository
2✔
30
from renku_data_services.users.db import UserRepo
2✔
31

32

33
class _Base:
2✔
34
    def __init__(self, session_maker: Callable[..., AsyncSession], quotas_repo: QuotaRepository) -> None:
2✔
35
        self.session_maker = session_maker
2✔
36
        self.quotas_repo = quotas_repo
2✔
37

38

39
def _resource_pool_access_control(
2✔
40
    api_user: base_models.APIUser,
41
    stmt: Select[tuple[schemas.ResourcePoolORM]],
42
) -> Select[tuple[schemas.ResourcePoolORM]]:
43
    """Modifies a select query to list resource pools based on whether the user is logged in or not."""
44
    output = stmt
2✔
45
    match (api_user.is_authenticated, api_user.is_admin):
2✔
46
        case True, False:
2✔
47
            # The user is logged in but not an admin
48
            api_user_has_default_pool_access = not_(
1✔
49
                # NOTE: The only way to check that a user is allowed to access the default pool is that such a
50
                # record does NOT EXIST in the database
51
                select(schemas.UserORM.no_default_access)
52
                .where(and_(schemas.UserORM.keycloak_id == api_user.id, schemas.UserORM.no_default_access == true()))
53
                .exists()
54
            )
55
            output = output.join(schemas.UserORM, schemas.ResourcePoolORM.users, isouter=True).where(
1✔
56
                or_(
57
                    schemas.UserORM.keycloak_id == api_user.id,  # the user is part of the pool
58
                    and_(  # the pool is not default but is public
59
                        schemas.ResourcePoolORM.default != true(), schemas.ResourcePoolORM.public == true()
60
                    ),
61
                    and_(  # the pool is default and the user is not prohibited from accessing it
62
                        schemas.ResourcePoolORM.default == true(),
63
                        api_user_has_default_pool_access,
64
                    ),
65
                )
66
            )
67
        case True, True:
2✔
68
            # The user is logged in and is an admin, they can see all resource pools
69
            pass
2✔
70
        case False, _:
1✔
71
            # The user is not logged in, they can see only the public resource pools
72
            output = output.where(schemas.ResourcePoolORM.public == true())
1✔
73
    return output
2✔
74

75

76
def _classes_user_access_control(
2✔
77
    api_user: base_models.APIUser,
78
    stmt: Select[tuple[schemas.ResourceClassORM]],
79
) -> Select[tuple[schemas.ResourceClassORM]]:
80
    """Adjust the select statement for classes based on whether the user is logged in or not."""
81
    output = stmt
2✔
82
    match (api_user.is_authenticated, api_user.is_admin):
2✔
83
        case True, False:
2✔
84
            # The user is logged in but is not an admin
85
            api_user_has_default_pool_access = not_(
1✔
86
                # NOTE: The only way to check that a user is allowed to access the default pool is that such a
87
                # record does NOT EXIST in the database
88
                select(schemas.UserORM.no_default_access)
89
                .where(and_(schemas.UserORM.keycloak_id == api_user.id, schemas.UserORM.no_default_access == true()))
90
                .exists()
91
            )
92
            output = output.join(schemas.UserORM, schemas.ResourcePoolORM.users, isouter=True).where(
1✔
93
                or_(
94
                    schemas.UserORM.keycloak_id == api_user.id,  # the user is part of the pool
95
                    and_(  # the pool is not default but is public
96
                        schemas.ResourcePoolORM.default != true(), schemas.ResourcePoolORM.public == true()
97
                    ),
98
                    and_(  # the pool is default and the user is not prohibited from accessing it
99
                        schemas.ResourcePoolORM.default == true(),
100
                        api_user_has_default_pool_access,
101
                    ),
102
                )
103
            )
104
        case True, True:
2✔
105
            # The user is logged in and is an admin, they can see all resource classes
106
            pass
2✔
107
        case False, _:
×
108
            # The user is not logged in, they can see only the classes from public resource pools
109
            output = output.join(schemas.UserORM, schemas.ResourcePoolORM.users, isouter=True).where(
×
110
                schemas.ResourcePoolORM.public == true(),
111
            )
112
    return output
2✔
113

114

115
_P = ParamSpec("_P")
2✔
116
_T = TypeVar("_T")
2✔
117

118

119
def _only_admins(
2✔
120
    f: Callable[Concatenate[Any, _P], Coroutine[Any, Any, _T]],
121
) -> Callable[Concatenate[Any, _P], Coroutine[Any, Any, _T]]:
122
    """Decorator that errors out if the user is not an admin.
123

124
    It expects the APIUser model to be a named parameter in the decorated function or
125
    to be the first parameter (after self).
126
    """
127

128
    @wraps(f)
2✔
129
    async def decorated_function(self: Any, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
130
        api_user = None
2✔
131
        if "api_user" in kwargs:
2✔
132
            api_user = kwargs["api_user"]
2✔
133
        elif len(args) >= 1:
2✔
134
            api_user = args[0]
2✔
135
        if api_user is not None and not isinstance(api_user, base_models.APIUser):
2✔
136
            raise errors.ProgrammingError(message="Expected user parameter is not of type APIUser.")
×
137
        if api_user is None:
2✔
138
            raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.")
×
139
        if not api_user.is_admin:
2✔
140
            raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
×
141

142
        # the user is authenticated and is an admin
143
        response = await f(self, *args, **kwargs)
2✔
144
        return response
2✔
145

146
    return decorated_function
2✔
147

148

149
class ResourcePoolRepository(_Base):
2✔
150
    """The adapter used for accessing resource pools with SQLAlchemy."""
151

152
    def __init__(self, session_maker: Callable[..., AsyncSession], quotas_repo: QuotaRepository):
2✔
153
        super().__init__(session_maker, quotas_repo)
2✔
154
        self.__cluster_repo = ClusterRepository(session_maker=self.session_maker)
2✔
155

156
    async def initialize(self, async_connection_url: str, rp: models.UnsavedResourcePool) -> None:
2✔
157
        """Add the default resource pool if it does not already exist."""
158
        engine = create_async_engine(async_connection_url, poolclass=NullPool)
×
159
        session_maker = async_sessionmaker(
×
160
            engine,
161
            expire_on_commit=True,
162
        )
163
        async with session_maker() as session, session.begin():
×
164
            stmt = select(schemas.ResourcePoolORM.default == true())
×
165
            res = await session.execute(stmt)
×
166
            default_rp = res.scalars().first()
×
167
            if default_rp is None:
×
168
                orm = schemas.ResourcePoolORM.from_unsaved_model(new_resource_pool=rp, quota=None, cluster=None)
×
169
                session.add(orm)
×
170

171
    async def get_resource_pools(
2✔
172
        self, api_user: base_models.APIUser, id: Optional[int] = None, name: Optional[str] = None
173
    ) -> list[models.ResourcePool]:
174
        """Get resource pools from database."""
175
        async with self.session_maker() as session:
2✔
176
            stmt = (
2✔
177
                select(schemas.ResourcePoolORM)
178
                .options(selectinload(schemas.ResourcePoolORM.classes))
179
                .options(selectinload(schemas.ResourcePoolORM.cluster))
180
            )
181
            if name is not None:
2✔
182
                stmt = stmt.where(schemas.ResourcePoolORM.name == name)
1✔
183
            if id is not None:
2✔
184
                stmt = stmt.where(schemas.ResourcePoolORM.id == id)
2✔
185
            # NOTE: The line below ensures that the right users can access the right resources, do not remove.
186
            stmt = _resource_pool_access_control(api_user, stmt)
2✔
187
            res = await session.execute(stmt)
2✔
188
            orms = res.scalars().all()
2✔
189
            output: list[models.ResourcePool] = []
2✔
190
            for rp in orms:
2✔
191
                quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
1✔
192
                output.append(rp.dump(quota))
1✔
193
            return output
2✔
194

195
    async def get_resource_pool_from_class(
2✔
196
        self, api_user: base_models.APIUser, resource_class_id: int
197
    ) -> models.ResourcePool:
198
        """Get the resource pool the class belongs to."""
199
        async with self.session_maker() as session:
1✔
200
            stmt = (
1✔
201
                select(schemas.ResourcePoolORM)
202
                .where(schemas.ResourcePoolORM.classes.any(schemas.ResourceClassORM.id == resource_class_id))
203
                .options(selectinload(schemas.ResourcePoolORM.classes))
204
                .options(selectinload(schemas.ResourcePoolORM.cluster))
205
            )
206
            # NOTE: The line below ensures that the right users can access the right resources, do not remove.
207
            stmt = _resource_pool_access_control(api_user, stmt)
1✔
208
            res = await session.execute(stmt)
1✔
209
            orm = res.scalar()
1✔
210
            if orm is None:
1✔
UNCOV
211
                raise errors.MissingResourceError(
×
212
                    message=f"Could not find the resource pool where a class with ID {resource_class_id} exists."
213
                )
214
            quota = self.quotas_repo.get_quota(orm.quota) if orm.quota else None
1✔
215
            return orm.dump(quota)
1✔
216

217
    async def get_default_resource_pool(self) -> models.ResourcePool:
2✔
218
        """Get the default resource pool."""
219
        async with self.session_maker() as session:
×
220
            stmt = (
×
221
                select(schemas.ResourcePoolORM)
222
                .where(schemas.ResourcePoolORM.default == true())
223
                .options(selectinload(schemas.ResourcePoolORM.classes))
224
            )
225
            res = await session.scalar(stmt)
×
226
            if res is None:
×
227
                raise errors.ProgrammingError(
×
228
                    message="Could not find the default resource pool, but this has to exist."
229
                )
230
            quota = self.quotas_repo.get_quota(res.quota) if res.quota else None
×
231
            return res.dump(quota)
×
232

233
    async def get_default_resource_class(self) -> models.ResourceClass:
2✔
234
        """Get the default resource class in the default resource pool."""
235
        async with self.session_maker() as session:
×
236
            stmt = (
×
237
                select(schemas.ResourceClassORM)
238
                .where(schemas.ResourceClassORM.default == true())
239
                .where(schemas.ResourceClassORM.resource_pool.has(schemas.ResourcePoolORM.default == true()))
240
            )
241
            res = await session.scalar(stmt)
×
242
            if res is None:
×
243
                raise errors.ProgrammingError(
×
244
                    message="Could not find the default class from the default resource pool, but this has to exist."
245
                )
246
            return res.dump()
×
247

248
    async def filter_resource_pools(
2✔
249
        self,
250
        api_user: base_models.APIUser,
251
        cpu: float = 0,
252
        memory: int = 0,
253
        max_storage: int = 0,
254
        gpu: int = 0,
255
    ) -> list[models.ResourcePool]:
256
        """Get resource pools from database with indication of which resource class matches the specified criteria."""
257
        async with self.session_maker() as session:
2✔
258
            criteria = models.UnsavedResourceClass(
2✔
259
                name="criteria",
260
                cpu=cpu,
261
                gpu=gpu,
262
                memory=memory,
263
                max_storage=max_storage,
264
                # NOTE: the default storage has to be <= max_storage but is not used for filtering classes,
265
                # only the max_storage is used to filter resource classes that match a request
266
                default_storage=max_storage,
267
            )
268
            stmt = (
2✔
269
                select(schemas.ResourcePoolORM)
270
                .distinct()
271
                .options(selectinload(schemas.ResourcePoolORM.classes))
272
                .order_by(
273
                    schemas.ResourcePoolORM.id,
274
                    schemas.ResourcePoolORM.name,
275
                )
276
            )
277
            # NOTE: The line below ensures that the right users can access the right resources, do not remove.
278
            stmt = _resource_pool_access_control(api_user, stmt)
2✔
279
            res = await session.execute(stmt)
2✔
280
            return [
2✔
281
                i.dump(quota=self.quotas_repo.get_quota(i.quota), class_match_criteria=criteria)
282
                for i in res.scalars().all()
283
            ]
284

285
    @_only_admins
2✔
286
    async def insert_resource_pool(
2✔
287
        self, api_user: base_models.APIUser, new_resource_pool: models.UnsavedResourcePool
288
    ) -> models.ResourcePool:
289
        """Insert resource pool into database."""
290

291
        cluster = None
1✔
292
        if new_resource_pool.cluster_id:
1✔
293
            cluster = await self.__cluster_repo.select(cluster_id=new_resource_pool.cluster_id)
1✔
294

295
        quota = None
1✔
296
        if new_resource_pool.quota is not None:
1✔
297
            quota = self.quotas_repo.create_quota(new_quota=new_resource_pool.quota)
1✔
298

299
        async with self.session_maker() as session, session.begin():
1✔
300
            resource_pool = schemas.ResourcePoolORM.from_unsaved_model(
1✔
301
                new_resource_pool=new_resource_pool, quota=quota, cluster=cluster
302
            )
303
            if resource_pool.default:
1✔
304
                stmt = select(schemas.ResourcePoolORM).where(schemas.ResourcePoolORM.default == true())
1✔
305
                res = await session.execute(stmt)
1✔
306
                default_rps = res.unique().scalars().all()
1✔
307
                if len(default_rps) >= 1:
1✔
308
                    raise errors.ValidationError(
×
309
                        message="There can only be one default resource pool and one already exists."
310
                    )
311

312
            session.add(resource_pool)
1✔
313
            await session.flush()
1✔
314
            await session.refresh(resource_pool)
1✔
315
            return resource_pool.dump(quota=quota)
1✔
316

317
    async def get_classes(
2✔
318
        self,
319
        api_user: Optional[base_models.APIUser] = None,
320
        id: Optional[int] = None,
321
        name: Optional[str] = None,
322
        resource_pool_id: Optional[int] = None,
323
    ) -> list[models.ResourceClass]:
324
        """Get classes from the database."""
325
        async with self.session_maker() as session:
2✔
326
            stmt = select(schemas.ResourceClassORM).join(
2✔
327
                schemas.ResourcePoolORM, schemas.ResourceClassORM.resource_pool, isouter=True
328
            )
329
            if resource_pool_id is not None:
2✔
330
                stmt = stmt.where(schemas.ResourcePoolORM.id == resource_pool_id)
2✔
331
            if id is not None:
2✔
332
                stmt = stmt.where(schemas.ResourceClassORM.id == id)
2✔
333
            if name is not None:
2✔
334
                stmt = stmt.where(schemas.ResourceClassORM.name == name)
2✔
335

336
            # Apply user access control if api_user is provided
337
            if api_user is not None:
2✔
338
                # NOTE: The line below ensures that the right users can access the right resources, do not remove.
339
                stmt = _classes_user_access_control(api_user, stmt)
2✔
340

341
            res = await session.execute(stmt)
2✔
342
            orms = res.scalars().all()
2✔
343
            return [orm.dump() for orm in orms]
2✔
344

345
    async def get_resource_class(self, api_user: base_models.APIUser, id: int) -> models.ResourceClass:
2✔
346
        """Get a specific resource class by its ID."""
347
        classes = await self.get_classes(api_user, id)
×
348
        if len(classes) == 0:
×
349
            raise errors.MissingResourceError(message=f"The resource class with ID {id} cannot be found")
×
350
        return classes[0]
×
351

352
    @_only_admins
2✔
353
    async def insert_resource_class(
2✔
354
        self,
355
        api_user: base_models.APIUser,
356
        new_resource_class: models.UnsavedResourceClass,
357
        *,
358
        resource_pool_id: Optional[int] = None,
359
    ) -> models.ResourceClass:
360
        """Insert a resource class in the database."""
361
        async with self.session_maker() as session, session.begin():
2✔
362
            resource_class = schemas.ResourceClassORM.from_unsaved_model(
2✔
363
                new_resource_class=new_resource_class, resource_pool_id=resource_pool_id
364
            )
365
            print(f"resource_class = {resource_class.resource_pool_id}")
2✔
366

367
            if resource_pool_id is not None:
2✔
368
                stmt = select(schemas.ResourcePoolORM).where(schemas.ResourcePoolORM.id == resource_pool_id)
2✔
369
                res = await session.execute(stmt)
2✔
370
                rp = res.scalars().first()
2✔
371
                if rp is None:
2✔
372
                    raise errors.MissingResourceError(
2✔
373
                        message=f"Resource pool with id {resource_pool_id} does not exist."
374
                    )
375
                resource_class.resource_pool = rp
1✔
376
                if resource_class.default and len(rp.classes) > 0 and any([icls.default for icls in rp.classes]):
1✔
377
                    raise errors.ValidationError(
×
378
                        message="There can only be one default resource class per resource pool."
379
                    )
380
                quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
1✔
381
                if quota and not quota.is_resource_class_compatible(new_resource_class):
1✔
382
                    raise errors.ValidationError(
×
383
                        message="The resource class {resource_class} is not compatible with the quota {quota}."
384
                    )
385

386
            session.add(resource_class)
1✔
387
            await session.flush()
1✔
388
            await session.refresh(resource_class)
1✔
389
            return resource_class.dump()
1✔
390

391
    @_only_admins
2✔
392
    async def update_resource_pool(
2✔
393
        self, api_user: base_models.APIUser, resource_pool_id: int, update: models.ResourcePoolPatch
394
    ) -> models.ResourcePool:
395
        """Update an existing resource pool in the database."""
396
        async with self.session_maker() as session, session.begin():
2✔
397
            stmt = (
2✔
398
                select(schemas.ResourcePoolORM)
399
                .where(schemas.ResourcePoolORM.id == resource_pool_id)
400
                .options(selectinload(schemas.ResourcePoolORM.classes))
401
            )
402
            res = await session.scalars(stmt)
2✔
403
            rp = res.one_or_none()
2✔
404
            if rp is None:
2✔
405
                raise errors.MissingResourceError(message=f"Resource pool with id {resource_pool_id} cannot be found")
2✔
406
            quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
1✔
407

408
            validate_resource_pool_update(existing=rp.dump(quota=quota), update=update)
1✔
409

410
            if update.name is not None:
1✔
411
                rp.name = update.name
1✔
412
            if update.public is not None:
1✔
413
                rp.public = update.public
1✔
414
            if update.default is not None:
1✔
415
                rp.default = update.default
1✔
416
            if update.idle_threshold == 0 or update.idle_threshold is RESET:
1✔
417
                # Using "0" removes the value
418
                rp.idle_threshold = None
×
419
            elif isinstance(update.idle_threshold, int):
1✔
420
                rp.idle_threshold = update.idle_threshold
1✔
421
            if update.hibernation_threshold == 0 or update.hibernation_threshold is RESET:
1✔
422
                # Using "0" removes the value
423
                rp.hibernation_threshold = None
×
424
            elif isinstance(update.hibernation_threshold, int):
1✔
425
                rp.hibernation_threshold = update.hibernation_threshold
1✔
426

427
            if update.cluster_id is RESET:
1✔
428
                rp.cluster_id = None
1✔
429
            elif update.cluster_id is not None:
1✔
430
                cluster = await self.__cluster_repo.select(update.cluster_id)
×
431
                rp.cluster_id = cluster.id
×
432

433
            if update.quota is RESET and rp.quota:
1✔
434
                # Remove the existing quota
435
                self.quotas_repo.delete_quota(name=rp.quota)
×
436
            elif isinstance(update.quota, models.QuotaPatch) and rp.quota is None:
1✔
437
                # Create a new quota, the `update.quota` object has already been validated
438
                assert update.quota.cpu is not None
×
439
                assert update.quota.memory is not None
×
440
                assert update.quota.gpu is not None
×
441
                new_quota = models.UnsavedQuota(
×
442
                    cpu=update.quota.cpu,
443
                    memory=update.quota.memory,
444
                    gpu=update.quota.gpu,
445
                )
446
                quota = self.quotas_repo.create_quota(new_quota=new_quota)
×
447
                rp.quota = quota.id
×
448
            elif isinstance(update.quota, models.QuotaPatch):
1✔
449
                assert rp.quota is not None
1✔
450
                assert quota is not None
1✔
451
                # Update the existing quota
452
                updated_quota = models.Quota(
1✔
453
                    cpu=update.quota.cpu if update.quota.cpu is not None else quota.cpu,
454
                    memory=update.quota.memory if update.quota.memory is not None else quota.memory,
455
                    gpu=update.quota.gpu if update.quota.gpu is not None else quota.gpu,
456
                    gpu_kind=update.quota.gpu_kind if update.quota.gpu_kind is not None else quota.gpu_kind,
457
                    id=quota.id,
458
                )
459
                quota = self.quotas_repo.update_quota(quota=updated_quota)
1✔
460
                rp.quota = quota.id
1✔
461

462
            new_classes_coroutines = []
1✔
463
            if update.classes is not None:
1✔
464
                for rc in update.classes:
1✔
465
                    new_classes_coroutines.append(
1✔
466
                        self.update_resource_class(
467
                            api_user=api_user, resource_pool_id=resource_pool_id, resource_class_id=rc.id, update=rc
468
                        )
469
                    )
470

471
            if update.remote is RESET:
1✔
472
                rp.remote_provider_id = None
1✔
473
                rp.remote_json = None
1✔
474
            elif update.remote is not None:
1✔
475
                rp.remote_provider_id = (
1✔
476
                    update.remote.provider_id if update.remote.provider_id is not None else rp.remote_provider_id
477
                )
478
                remote_json = rp.remote_json if rp.remote_json is not None else dict()
1✔
479
                remote_json.update(update.remote.to_dict())
1✔
480
                del remote_json["provider_id"]
1✔
481
                rp.remote_json = remote_json
1✔
482

483
            await gather(*new_classes_coroutines)
1✔
484
            await session.flush()
1✔
485
            await session.refresh(rp)
1✔
486
            return rp.dump(quota=quota)
1✔
487

488
    @_only_admins
2✔
489
    async def delete_resource_pool(self, api_user: base_models.APIUser, id: int) -> None:
2✔
490
        """Delete a resource pool from the database."""
491
        async with self.session_maker() as session, session.begin():
2✔
492
            stmt = select(schemas.ResourcePoolORM).where(schemas.ResourcePoolORM.id == id)
2✔
493
            res = await session.execute(stmt)
2✔
494
            rp = res.scalars().first()
2✔
495
            if rp is not None:
2✔
496
                if rp.default:
1✔
497
                    raise errors.ValidationError(message="The default resource pool cannot be deleted.")
×
498
                await session.delete(rp)
1✔
499
                if rp.quota:
1✔
500
                    self.quotas_repo.delete_quota(rp.quota)
1✔
501
            return None
2✔
502

503
    @_only_admins
2✔
504
    async def delete_resource_class(
2✔
505
        self, api_user: base_models.APIUser, resource_pool_id: int, resource_class_id: int
506
    ) -> None:
507
        """Delete a specific resource class."""
508
        async with self.session_maker() as session, session.begin():
2✔
509
            stmt = (
2✔
510
                select(schemas.ResourceClassORM)
511
                .where(schemas.ResourceClassORM.id == resource_class_id)
512
                .where(schemas.ResourceClassORM.resource_pool_id == resource_pool_id)
513
            )
514
            res = await session.execute(stmt)
2✔
515
            cls = res.scalars().first()
2✔
516
            if cls is not None:
2✔
517
                if cls.default:
1✔
518
                    raise errors.ValidationError(message="The default resource class cannot be deleted.")
×
519
                await session.delete(cls)
1✔
520

521
    @_only_admins
2✔
522
    async def update_resource_class(
2✔
523
        self,
524
        api_user: base_models.APIUser,
525
        resource_pool_id: int,
526
        resource_class_id: int,
527
        update: models.ResourceClassPatch,
528
    ) -> models.ResourceClass:
529
        """Update a specific resource class."""
530
        async with self.session_maker() as session, session.begin():
2✔
531
            stmt = (
2✔
532
                select(schemas.ResourceClassORM)
533
                .where(schemas.ResourceClassORM.id == resource_class_id)
534
                .where(schemas.ResourceClassORM.resource_pool_id == resource_pool_id)
535
                .join(schemas.ResourcePoolORM, schemas.ResourceClassORM.resource_pool)
536
                .options(selectinload(schemas.ResourceClassORM.resource_pool))
537
            )
538
            res = await session.scalars(stmt)
2✔
539
            cls = res.one_or_none()
2✔
540
            if cls is None:
2✔
541
                raise errors.MissingResourceError(
1✔
542
                    message=(
543
                        f"The resource class with id {resource_class_id} does not exist, the resource pool with "
544
                        f"id {resource_pool_id} does not exist or the requested resource class is not "
545
                        "associated with the resource pool"
546
                    )
547
                )
548

549
            validate_resource_class_update(existing=cls.dump(), update=update)
1✔
550

551
            # NOTE: updating the 'default' field is not supported, so it is skipped below
552
            if update.name is not None:
1✔
553
                cls.name = update.name
1✔
554
            if update.cpu is not None:
1✔
555
                cls.cpu = update.cpu
1✔
556
            if update.memory is not None:
1✔
557
                cls.memory = update.memory
1✔
558
            if update.max_storage is not None:
1✔
559
                cls.max_storage = update.max_storage
1✔
560
            if update.gpu is not None:
1✔
561
                cls.gpu = update.gpu
1✔
562
            if update.default_storage is not None:
1✔
563
                cls.default_storage = update.default_storage
1✔
564

565
            if update.node_affinities is not None:
1✔
566
                existing_affinities: dict[str, schemas.NodeAffintyORM] = {i.key: i for i in cls.node_affinities}
1✔
567
                new_affinities: dict[str, schemas.NodeAffintyORM] = {
1✔
568
                    i.key: schemas.NodeAffintyORM(
569
                        key=i.key,
570
                        required_during_scheduling=i.required_during_scheduling,
571
                    )
572
                    for i in update.node_affinities
573
                }
574
                for new_affinity_key, new_affinity in new_affinities.items():
1✔
575
                    if new_affinity_key in existing_affinities:
1✔
576
                        # UPDATE existing affinity
577
                        existing_affinity = existing_affinities[new_affinity_key]
1✔
578
                        if new_affinity.required_during_scheduling != existing_affinity.required_during_scheduling:
1✔
579
                            existing_affinity.required_during_scheduling = new_affinity.required_during_scheduling
1✔
580
                    else:
581
                        # CREATE a brand new affinity
582
                        cls.node_affinities.append(new_affinity)
1✔
583
                # REMOVE an affinity
584
                for existing_affinity_key, existing_affinity in existing_affinities.items():
1✔
585
                    if existing_affinity_key not in new_affinities:
1✔
586
                        cls.node_affinities.remove(existing_affinity)
1✔
587

588
            if update.tolerations is not None:
1✔
589
                existing_tolerations: dict[str, schemas.TolerationORM] = {tol.key: tol for tol in cls.tolerations}
1✔
590
                new_tolerations: dict[str, schemas.TolerationORM] = {
1✔
591
                    tol: schemas.TolerationORM(key=tol) for tol in update.tolerations
592
                }
593
                for new_tol_key, new_tol in new_tolerations.items():
1✔
594
                    if new_tol_key not in existing_tolerations:
1✔
595
                        # CREATE a brand new toleration
596
                        cls.tolerations.append(new_tol)
1✔
597
                # REMOVE a toleration
598
                for existing_tol_key, existing_tol in existing_tolerations.items():
1✔
599
                    if existing_tol_key not in new_tolerations:
1✔
600
                        cls.tolerations.remove(existing_tol)
1✔
601

602
            # NOTE: do we need to perform this check?
603
            if cls.resource_pool is None:
1✔
604
                raise errors.BaseError(
×
605
                    message="Unexpected internal error.",
606
                    detail=f"The resource class {resource_class_id} is not associated with any resource pool.",
607
                )
608

609
            await session.flush()
1✔
610
            await session.refresh(cls)
1✔
611

612
            cls_model = cls.dump()
1✔
613
            quota = self.quotas_repo.get_quota(cls_model.quota) if cls_model.quota else None
1✔
614
            if quota and not quota.is_resource_class_compatible(cls_model):
1✔
615
                raise errors.ValidationError(
×
616
                    message=f"The resource class {cls_model} is not compatible with the quota {quota}"
617
                )
618

619
            return cls_model
1✔
620

621
    @_only_admins
2✔
622
    async def get_tolerations(self, api_user: base_models.APIUser, resource_pool_id: int, class_id: int) -> list[str]:
2✔
623
        """Get all tolerations of a resource class."""
624
        async with self.session_maker() as session:
2✔
625
            res_classes = await self.get_classes(api_user, class_id, resource_pool_id=resource_pool_id)
2✔
626
            if len(res_classes) == 0:
2✔
627
                raise errors.MissingResourceError(
1✔
628
                    message=f"The resource pool with ID {resource_pool_id} or the resource "
629
                    f"class with ID {class_id} do not exist, or they are not related."
630
                )
631
            stmt = select(schemas.TolerationORM).where(schemas.TolerationORM.resource_class_id == class_id)
1✔
632
            res = await session.execute(stmt)
1✔
633
            return [i.key for i in res.scalars().all()]
1✔
634

635
    @_only_admins
2✔
636
    async def delete_tolerations(self, api_user: base_models.APIUser, resource_pool_id: int, class_id: int) -> None:
2✔
637
        """Delete all tolerations for a specific resource class."""
638
        async with self.session_maker() as session, session.begin():
2✔
639
            res_classes = await self.get_classes(api_user, class_id, resource_pool_id=resource_pool_id)
2✔
640
            if len(res_classes) == 0:
2✔
641
                raise errors.MissingResourceError(
1✔
642
                    message=f"The resource pool with ID {resource_pool_id} or the resource "
643
                    f"class with ID {class_id} do not exist, or they are not related."
644
                )
645
            stmt = delete(schemas.TolerationORM).where(schemas.TolerationORM.resource_class_id == class_id)
1✔
646
            await session.execute(stmt)
1✔
647

648
    @_only_admins
2✔
649
    async def get_affinities(
2✔
650
        self, api_user: base_models.APIUser, resource_pool_id: int, class_id: int
651
    ) -> list[models.NodeAffinity]:
652
        """Get all affinities for a resource class."""
653
        async with self.session_maker() as session:
2✔
654
            res_classes = await self.get_classes(api_user, class_id, resource_pool_id=resource_pool_id)
2✔
655
            if len(res_classes) == 0:
2✔
656
                raise errors.MissingResourceError(
1✔
657
                    message=f"The resource pool with ID {resource_pool_id} or the resource "
658
                    f"class with ID {class_id} do not exist, or they are not related."
659
                )
660
            stmt = select(schemas.NodeAffintyORM).where(schemas.NodeAffintyORM.resource_class_id == class_id)
1✔
661
            res = await session.execute(stmt)
1✔
662
            return [i.dump() for i in res.scalars().all()]
1✔
663

664
    @_only_admins
2✔
665
    async def delete_affinities(self, api_user: base_models.APIUser, resource_pool_id: int, class_id: int) -> None:
2✔
666
        """Delete all affinities from a resource class."""
667
        async with self.session_maker() as session, session.begin():
2✔
668
            res_classes = await self.get_classes(api_user, class_id, resource_pool_id=resource_pool_id)
2✔
669
            if len(res_classes) == 0:
2✔
670
                raise errors.MissingResourceError(
1✔
671
                    message=f"The resource pool with ID {resource_pool_id} or the resource "
672
                    f"class with ID {class_id} do not exist, or they are not related."
673
                )
674
            stmt = delete(schemas.NodeAffintyORM).where(schemas.NodeAffintyORM.resource_class_id == class_id)
1✔
675
            await session.execute(stmt)
1✔
676

677
    async def get_quota(self, api_user: base_models.APIUser, resource_pool_id: int) -> models.Quota:
2✔
678
        """Get the quota of a resource pool."""
679
        rps = await self.get_resource_pools(api_user=api_user, id=resource_pool_id)
2✔
680
        if len(rps) < 1:
2✔
681
            raise errors.MissingResourceError(message=f"Cannot find the resource pool with ID {resource_pool_id}.")
1✔
682
        rp = rps[0]
1✔
683
        if rp.quota is None:
1✔
684
            raise errors.MissingResourceError(
×
685
                message=f"The resource pool with ID {resource_pool_id} does not have a quota."
686
            )
687
        return rp.quota
1✔
688

689
    @_only_admins
2✔
690
    async def update_quota(
2✔
691
        self,
692
        api_user: base_models.APIUser,
693
        resource_pool_id: int,
694
        update: models.QuotaPatch,
695
        quota_put_id: str | None = None,
696
    ) -> models.Quota:
697
        """Update the quota of a resource pool."""
698
        rps = await self.get_resource_pools(api_user=api_user, id=resource_pool_id)
2✔
699
        if len(rps) < 1:
2✔
700
            raise errors.MissingResourceError(message=f"Cannot find the resource pool with ID {resource_pool_id}.")
1✔
701
        rp = rps[0]
1✔
702
        if rp.quota is None:
1✔
703
            raise errors.MissingResourceError(
×
704
                message=f"The resource pool with ID {resource_pool_id} does not have a quota."
705
            )
706
        old_quota = rp.quota
1✔
707
        new_quota = models.Quota(
1✔
708
            cpu=update.cpu if update.cpu is not None else old_quota.cpu,
709
            memory=update.memory if update.memory is not None else old_quota.memory,
710
            gpu=update.gpu if update.gpu is not None else old_quota.gpu,
711
            gpu_kind=update.gpu_kind if update.gpu_kind is not None else old_quota.gpu_kind,
712
            id=quota_put_id or old_quota.id,
713
        )
714
        if new_quota.id != old_quota.id:
1✔
715
            raise errors.ValidationError(message="The 'id' field of a quota is immutable.")
×
716

717
        for rc in rp.classes:
1✔
718
            if not new_quota.is_resource_class_compatible(rc):
1✔
719
                raise errors.ValidationError(
×
720
                    message=f"The quota {new_quota} is not compatible with the resource class {rc}."
721
                )
722

723
        return self.quotas_repo.update_quota(quota=new_quota)
1✔
724

725

726
@dataclass
2✔
727
class Respository2Users:
2✔
728
    """Information about which users can access a specific resource pool."""
729

730
    resource_pool_id: int
2✔
731
    allowed: list[base_models.User] = field(default_factory=list)
2✔
732
    disallowed: list[base_models.User] = field(default_factory=list)
2✔
733

734

735
class UserRepository(_Base):
2✔
736
    """The adapter used for accessing resource pool users with SQLAlchemy."""
737

738
    def __init__(
2✔
739
        self, session_maker: Callable[..., AsyncSession], quotas_repo: QuotaRepository, user_repo: UserRepo
740
    ) -> None:
741
        super().__init__(session_maker, quotas_repo)
2✔
742
        self.kc_user_repo = user_repo
2✔
743

744
    @_only_admins
2✔
745
    async def get_resource_pool_users(
2✔
746
        self,
747
        *,
748
        api_user: base_models.APIUser,
749
        resource_pool_id: int,
750
        keycloak_id: Optional[str] = None,
751
    ) -> Respository2Users:
752
        """Get users of a specific resource pool from the database."""
753
        async with self.session_maker() as session, session.begin():
2✔
754
            stmt = (
2✔
755
                select(schemas.ResourcePoolORM)
756
                .where(schemas.ResourcePoolORM.id == resource_pool_id)
757
                .options(selectinload(schemas.ResourcePoolORM.users))
758
            )
759
            if keycloak_id is not None:
2✔
760
                stmt = stmt.join(schemas.ResourcePoolORM.users, isouter=True).where(
1✔
761
                    or_(
762
                        schemas.UserORM.keycloak_id == keycloak_id,
763
                        schemas.ResourcePoolORM.public == true(),
764
                        schemas.ResourceClassORM.default == true(),
765
                    )
766
                )
767
            res = await session.execute(stmt)
2✔
768
            rp = res.scalars().first()
2✔
769
            if rp is None:
2✔
770
                raise errors.MissingResourceError(message=f"Resource pool with id {resource_pool_id} does not exist")
1✔
771
            specific_user: base_models.User | None = None
1✔
772
            if keycloak_id:
1✔
773
                specific_user_res = (
×
774
                    await session.execute(select(schemas.UserORM).where(schemas.UserORM.keycloak_id == keycloak_id))
775
                ).scalar_one_or_none()
776
                specific_user = None if not specific_user_res else specific_user_res.dump()
×
777
            allowed: list[base_models.User] = []
1✔
778
            disallowed: list[base_models.User] = []
1✔
779
            if rp.default:
1✔
780
                disallowed_stmt = select(schemas.UserORM).where(schemas.UserORM.no_default_access == true())
1✔
781
                if keycloak_id:
1✔
782
                    disallowed_stmt = disallowed_stmt.where(schemas.UserORM.keycloak_id == keycloak_id)
×
783
                disallowed_res = await session.execute(disallowed_stmt)
1✔
784
                disallowed = [user.dump() for user in disallowed_res.scalars().all()]
1✔
785
                if specific_user and specific_user not in disallowed:
1✔
786
                    allowed = [specific_user]
×
787
            elif rp.public and not rp.default:
1✔
788
                if specific_user:
×
789
                    allowed = [specific_user]
×
790
            elif not rp.public and not rp.default:
1✔
791
                allowed = [user.dump() for user in rp.users]
1✔
792
            return Respository2Users(rp.id, allowed, disallowed)
1✔
793

794
    async def get_user_resource_pools(
2✔
795
        self,
796
        api_user: base_models.APIUser,
797
        keycloak_id: str,
798
        resource_pool_id: Optional[int] = None,
799
        resource_pool_name: Optional[str] = None,
800
    ) -> list[models.ResourcePool]:
801
        """Get resource pools that a specific user has access to."""
802
        async with self.session_maker() as session, session.begin():
2✔
803
            if not api_user.is_admin and api_user.id != keycloak_id:
2✔
804
                raise errors.ValidationError(
×
805
                    message="Users cannot query for resource pools that belong to other users."
806
                )
807

808
            stmt = select(schemas.ResourcePoolORM).options(selectinload(schemas.ResourcePoolORM.classes))
2✔
809
            stmt = stmt.where(
2✔
810
                or_(
811
                    schemas.ResourcePoolORM.public == true(),
812
                    schemas.ResourcePoolORM.users.any(schemas.UserORM.keycloak_id == keycloak_id),
813
                )
814
            )
815
            if resource_pool_name is not None:
2✔
816
                stmt = stmt.where(schemas.ResourcePoolORM.name == resource_pool_name)
×
817
            if resource_pool_id is not None:
2✔
818
                stmt = stmt.where(schemas.ResourcePoolORM.id == resource_pool_id)
1✔
819
            # NOTE: The line below ensures that the right users can access the right resources, do not remove.
820
            stmt = _resource_pool_access_control(api_user, stmt)
2✔
821
            res = await session.execute(stmt)
2✔
822
            rps: Sequence[schemas.ResourcePoolORM] = res.scalars().all()
2✔
823
            output: list[models.ResourcePool] = []
2✔
824
            for rp in rps:
2✔
825
                quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
1✔
826
                output.append(rp.dump(quota))
1✔
827
            return output
2✔
828

829
    @_only_admins
2✔
830
    async def update_user_resource_pools(
2✔
831
        self, api_user: base_models.APIUser, keycloak_id: str, resource_pool_ids: list[int], append: bool = True
832
    ) -> list[models.ResourcePool]:
833
        """Update the resource pools that a specific user has access to."""
834
        async with self.session_maker() as session, session.begin():
1✔
835
            kc_user = await self.kc_user_repo.get_user(keycloak_id)
1✔
836
            if kc_user is None:
1✔
837
                raise errors.MissingResourceError(message=f"The user with ID {keycloak_id} does not exist")
×
838
            stmt = (
1✔
839
                select(schemas.UserORM)
840
                .where(schemas.UserORM.keycloak_id == keycloak_id)
841
                .options(selectinload(schemas.UserORM.resource_pools))
842
            )
843
            res = await session.execute(stmt)
1✔
844
            user = res.scalars().first()
1✔
845
            if user is None:
1✔
846
                user = schemas.UserORM(keycloak_id=keycloak_id)
1✔
847
                session.add(user)
1✔
848
            stmt_rp = (
1✔
849
                select(schemas.ResourcePoolORM)
850
                .where(schemas.ResourcePoolORM.id.in_(resource_pool_ids))
851
                .options(selectinload(schemas.ResourcePoolORM.classes))
852
            )
853
            if user.no_default_access:
1✔
854
                stmt_rp = stmt_rp.where(schemas.ResourcePoolORM.default == false())
×
855
            res_rp = await session.execute(stmt_rp)
1✔
856
            rps_to_add = res_rp.scalars().all()
1✔
857
            if len(rps_to_add) != len(resource_pool_ids):
1✔
858
                missing_rps = set(resource_pool_ids).difference(set([i.id for i in rps_to_add]))
×
859
                raise errors.MissingResourceError(
×
860
                    message=(
861
                        f"The resource pools with ids: {missing_rps} do not exist or user doesn't have access to "
862
                        "default resource pool."
863
                    )
864
                )
865
            if user.no_default_access:
1✔
866
                default_rp = next((rp for rp in rps_to_add if rp.default), None)
×
867
                if default_rp:
×
868
                    raise errors.ForbiddenError(
×
869
                        message=f"User with keycloak id {keycloak_id} cannot access the default resource pool"
870
                    )
871
            if append:
1✔
872
                user_rp_ids = {rp.id for rp in user.resource_pools}
1✔
873
                rps_to_add = [rp for rp in rps_to_add if rp.id not in user_rp_ids]
1✔
874
                user.resource_pools.extend(rps_to_add)
1✔
875
            else:
876
                user.resource_pools = list(rps_to_add)
1✔
877
            output: list[models.ResourcePool] = []
1✔
878
            for rp in rps_to_add:
1✔
879
                quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
1✔
880
                output.append(rp.dump(quota))
1✔
881
            return output
1✔
882

883
    @_only_admins
2✔
884
    async def delete_resource_pool_user(
2✔
885
        self, api_user: base_models.APIUser, resource_pool_id: int, keycloak_id: str
886
    ) -> None:
887
        """Remove a user from a specific resource pool."""
888
        async with self.session_maker() as session, session.begin():
1✔
889
            sub = (
1✔
890
                select(schemas.UserORM.id)
891
                .join(schemas.ResourcePoolORM, schemas.UserORM.resource_pools)
892
                .where(schemas.UserORM.keycloak_id == keycloak_id)
893
                .where(schemas.ResourcePoolORM.id == resource_pool_id)
894
            )
895
            stmt = delete(schemas.resource_pools_users).where(schemas.resource_pools_users.c.user_id.in_(sub))
1✔
896
            await session.execute(stmt)
1✔
897

898
    @_only_admins
2✔
899
    async def update_resource_pool_users(
2✔
900
        self, api_user: base_models.APIUser, resource_pool_id: int, user_ids: Collection[str], append: bool = True
901
    ) -> list[base_models.User]:
902
        """Update the users to have access to a specific resource pool."""
903
        async with self.session_maker() as session, session.begin():
2✔
904
            stmt = (
2✔
905
                select(schemas.ResourcePoolORM)
906
                .where(schemas.ResourcePoolORM.id == resource_pool_id)
907
                .options(
908
                    selectinload(schemas.ResourcePoolORM.users),
909
                    selectinload(schemas.ResourcePoolORM.classes),
910
                )
911
            )
912
            res = await session.execute(stmt)
2✔
913
            rp: Optional[schemas.ResourcePoolORM] = res.scalars().first()
2✔
914
            if rp is None:
2✔
915
                raise errors.MissingResourceError(
1✔
916
                    message=f"The resource pool with id {resource_pool_id} does not exist"
917
                )
918
            if rp.default:
1✔
919
                # NOTE: If the resource pool is default just check if any users are prevented from having
920
                # default resource pool access - and remove the restriction.
921
                all_existing_users = await self.get_resource_pool_users(
1✔
922
                    api_user=api_user, resource_pool_id=resource_pool_id
923
                )
924
                users_to_modify = [user for user in all_existing_users.disallowed if user.keycloak_id in user_ids]
1✔
925
                return await gather(
1✔
926
                    *[
927
                        self.update_user(
928
                            api_user=api_user, keycloak_id=no_default_user.keycloak_id, no_default_access=False
929
                        )
930
                        for no_default_user in users_to_modify
931
                    ]
932
                )
933
            stmt_usr = select(schemas.UserORM).where(schemas.UserORM.keycloak_id.in_(user_ids))
1✔
934
            res_usr = await session.execute(stmt_usr)
1✔
935
            users_to_add_exist = res_usr.scalars().all()
1✔
936
            user_ids_to_add_exist = [i.keycloak_id for i in users_to_add_exist]
1✔
937
            users_to_add_missing = [
1✔
938
                schemas.UserORM(keycloak_id=user_id) for user_id in user_ids if user_id not in user_ids_to_add_exist
939
            ]
940
            if append:
1✔
941
                rp_user_ids = {rp.id for rp in rp.users}
1✔
942
                users_to_add = [u for u in list(users_to_add_exist) + users_to_add_missing if u.id not in rp_user_ids]
1✔
943
                rp.users.extend(users_to_add)
1✔
944
            else:
945
                rp.users = list(users_to_add_exist) + users_to_add_missing
1✔
946
            return [usr.dump() for usr in rp.users]
1✔
947

948
    @_only_admins
2✔
949
    async def update_user(self, api_user: base_models.APIUser, keycloak_id: str, **kwargs: Any) -> base_models.User:
2✔
950
        """Update a specific user."""
951
        async with self.session_maker() as session, session.begin():
1✔
952
            stmt = select(schemas.UserORM).where(schemas.UserORM.keycloak_id == keycloak_id)
1✔
953
            res = await session.execute(stmt)
1✔
954
            user: Optional[schemas.UserORM] = res.scalars().first()
1✔
955
            if not user:
1✔
956
                user = schemas.UserORM(keycloak_id=keycloak_id)
1✔
957
                session.add(user)
1✔
958
            allowed_updates = {"no_default_access"}
1✔
959
            if not set(kwargs.keys()).issubset(allowed_updates):
1✔
960
                raise errors.ValidationError(
×
961
                    message=f"Only the following fields {allowed_updates} can be updated for a resource pool user.."
962
                )
963
            if (no_default_access := kwargs.get("no_default_access")) is not None:
1✔
964
                user.no_default_access = no_default_access
1✔
965
            return user.dump()
1✔
966

967

968
@dataclass
2✔
969
class ClusterRepository:
2✔
970
    """Repository for cluster configurations."""
971

972
    session_maker: Callable[..., AsyncSession]
2✔
973

974
    async def select_all(self, cluster_id: ULID | None = None) -> AsyncGenerator[SavedClusterSettings, Any]:
2✔
975
        """Get cluster configurations from the database."""
976
        async with self.session_maker() as session:
2✔
977
            query = select(ClusterORM)
2✔
978
            if cluster_id is not None:
2✔
979
                query = query.where(ClusterORM.id == cluster_id)
2✔
980

981
            clusters = await session.stream_scalars(query)
2✔
982
            async for cluster in clusters:
2✔
983
                yield cluster.dump()
1✔
984

985
    async def select(self, cluster_id: ULID) -> SavedClusterSettings:
2✔
986
        """Get cluster configurations from the database."""
987
        async for cluster in self.select_all(cluster_id):
2✔
988
            return cluster
1✔
989

990
        raise errors.MissingResourceError(message=f"Cluster definition id='{cluster_id}' does not exist.")
2✔
991

992
    @_only_admins
2✔
993
    async def insert(self, api_user: base_models.APIUser, cluster: ClusterSettings) -> ClusterSettings:
2✔
994
        """Creates a new cluster configuration."""
995

996
        cluster_orm = ClusterORM.load(cluster)
2✔
997
        async with self.session_maker() as session, session.begin():
2✔
998
            session.add(cluster_orm)
2✔
999
            await session.flush()
2✔
1000
            await session.refresh(cluster_orm)
2✔
1001

1002
            return cluster_orm.dump()
2✔
1003

1004
    @_only_admins
2✔
1005
    async def update(self, api_user: base_models.APIUser, cluster: ClusterPatch, cluster_id: ULID) -> ClusterSettings:
2✔
1006
        """Updates a cluster configuration."""
1007

1008
        async with self.session_maker() as session, session.begin():
2✔
1009
            saved_cluster = (await session.scalars(select(ClusterORM).where(ClusterORM.id == cluster_id))).one_or_none()
2✔
1010
            if saved_cluster is None:
2✔
1011
                raise errors.MissingResourceError(message=f"Cluster definition id='{cluster_id}' does not exist.")
2✔
1012

1013
            for key, value in asdict(cluster).items():
1✔
1014
                match key, value:
1✔
1015
                    case "session_protocol", SessionProtocol():
1✔
1016
                        setattr(saved_cluster, key, value.value)
1✔
1017
                    case "session_storage_class", "":
1✔
1018
                        # If we received an empty string in the storage class, reset it to the default storage class by
1019
                        # setting it to None.
1020
                        setattr(saved_cluster, key, None)
×
1021
                    case "service_account_name", "":
1✔
1022
                        # If we received an empty string in the service account name, set it back to None.
1023
                        setattr(saved_cluster, key, None)
×
1024
                    case _, None:
1✔
1025
                        # Do not modify a value which has not been set in the patch
1026
                        pass
1✔
1027
                    case _, _:
1✔
1028
                        setattr(saved_cluster, key, value)
1✔
1029

1030
            await session.flush()
1✔
1031
            await session.refresh(saved_cluster)
1✔
1032

1033
            return saved_cluster.dump()
1✔
1034

1035
    @_only_admins
2✔
1036
    async def delete(self, api_user: base_models.APIUser, cluster_id: ULID) -> None:
2✔
1037
        """Get cluster configurations from the database."""
1038

1039
        async with self.session_maker() as session, session.begin():
2✔
1040
            r = await session.scalars(select(ClusterORM).where(ClusterORM.id == cluster_id))
2✔
1041
            cluster = r.one_or_none()
2✔
1042
            if cluster is not None:
2✔
1043
                await session.delete(cluster)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc