• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19430749134

17 Nov 2025 01:10PM UTC coverage: 86.399% (-0.04%) from 86.437%
19430749134

Pull #1115

github

web-flow
Merge 06a5cf42a into 88fbe3ee5
Pull Request #1115: fix: send indentify() call only when necessary

80 of 95 new or added lines in 6 files covered. (84.21%)

6 existing lines in 5 files now uncovered.

23103 of 26740 relevant lines covered (86.4%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.5
/components/renku_data_services/users/db.py
1
"""Database adapters and helpers for users."""
2

3
from __future__ import annotations
2✔
4

5
import secrets
2✔
6
from abc import abstractmethod
2✔
7
from collections.abc import AsyncGenerator, Callable, Mapping
2✔
8
from dataclasses import dataclass, field
2✔
9
from datetime import UTC, datetime, timedelta
2✔
10
from typing import Any, Protocol, cast
2✔
11

12
from cryptography.hazmat.primitives.asymmetric import rsa
2✔
13
from sqlalchemy import delete, func, select
2✔
14
from sqlalchemy.ext.asyncio import AsyncSession
2✔
15

16
from renku_data_services import base_models
2✔
17
from renku_data_services.app_config import logging
2✔
18
from renku_data_services.authz.authz import Authz, AuthzOperation, ResourceType
2✔
19
from renku_data_services.base_api.auth import APIUser, only_authenticated
2✔
20
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
2✔
21
from renku_data_services.base_models.metrics import MetricsService, UserIdentity
2✔
22
from renku_data_services.base_models.nel import Nel
2✔
23
from renku_data_services.errors import errors
2✔
24
from renku_data_services.namespace.db import GroupRepository
2✔
25
from renku_data_services.namespace.orm import NamespaceORM
2✔
26
from renku_data_services.search.db import SearchUpdatesRepo
2✔
27
from renku_data_services.search.decorators import update_search_document
2✔
28
from renku_data_services.users.config import UserPreferencesConfig
2✔
29
from renku_data_services.users.kc_api import IKeycloakAPI
2✔
30
from renku_data_services.users.models import (
2✔
31
    DeletedUser,
32
    KeycloakAdminEvent,
33
    PinnedProjects,
34
    UnsavedUserInfo,
35
    UserInfo,
36
    UserInfoFieldUpdate,
37
    UserInfoUpdate,
38
    UserPatch,
39
    UserPreferences,
40
)
41
from renku_data_services.users.orm import LastKeycloakEventTimestamp, UserMetricsORM, UserORM, UserPreferencesORM
2✔
42
from renku_data_services.utils.core import with_db_transaction
2✔
43
from renku_data_services.utils.cryptography import (
2✔
44
    decrypt_string,
45
    encrypt_rsa,
46
    encrypt_string,
47
    generate_random_encryption_key,
48
)
49

50
logger = logging.getLogger(__name__)
2✔
51

52

53
class UsernameResolver(Protocol):
2✔
54
    """Resolve usernames to their ids."""
55

56
    @abstractmethod
2✔
57
    async def resolve_usernames(self, names: Nel[str]) -> Mapping[str, str]:
2✔
58
        """Return a map of username->user_id tuples."""
59
        ...
×
60

61

62
class DbUsernameResolver(UsernameResolver, Protocol):
2✔
63
    """Resolve usernames using the database."""
64

65
    @abstractmethod
2✔
66
    def make_session(self) -> AsyncSession:
2✔
67
        """Create a db session."""
68
        ...
×
69

70
    async def resolve_usernames(self, names: Nel[str]) -> dict[str, str]:
2✔
71
        """Resolve usernames to their user ids."""
72
        async with self.make_session() as session, session.begin():
1✔
73
            result = await session.execute(
1✔
74
                select(NamespaceORM.slug, NamespaceORM.user_id).where(
75
                    NamespaceORM.slug.in_(names), NamespaceORM.user_id.is_not(None)
76
                )
77
            )
78
            ret: dict[str, str] = {}
1✔
79
            for slug, id in result:
1✔
80
                ret.update({slug: id})
1✔
81

82
            return ret
1✔
83

84

85
@dataclass
2✔
86
class UserRepo(DbUsernameResolver):
2✔
87
    """An adapter for accessing users from the database."""
88

89
    session_maker: Callable[..., AsyncSession]
2✔
90
    group_repo: GroupRepository
2✔
91
    search_updates_repo: SearchUpdatesRepo
2✔
92
    encryption_key: bytes | None = field(repr=False)
2✔
93
    metrics: MetricsService
2✔
94
    authz: Authz
2✔
95

96
    def __post_init__(self) -> None:
2✔
97
        self._users_sync = UsersSync(self.session_maker, self.group_repo, self, self.metrics, self.authz)
2✔
98

99
    def make_session(self) -> AsyncSession:
2✔
100
        """Create a db session."""
101
        return self.session_maker()
1✔
102

103
    async def initialize(self, kc_api: IKeycloakAPI) -> None:
2✔
104
        """Do a total sync of users from Keycloak if there is nothing in the DB."""
105
        users = await self._get_users()
2✔
106
        if len(users) > 0:
2✔
107
            return
1✔
108
        await self._users_sync.users_sync(kc_api)
2✔
109

110
    async def _add_api_user(self, user: APIUser) -> UserInfo:
2✔
111
        if not user.id:
1✔
112
            raise errors.UnauthorizedError(message="The user has to be authenticated to be inserted in the DB.")
×
113
        result = await self._users_sync.update_or_insert_user(
1✔
114
            user=UnsavedUserInfo(
115
                id=user.id,
116
                email=user.email,
117
                first_name=user.first_name,
118
                last_name=user.last_name,
119
            )
120
        )
121
        return result.new
1✔
122

123
    async def get_user(self, id: str) -> UserInfo | None:
2✔
124
        """Get a specific user from the database."""
125
        async with self.session_maker() as session:
2✔
126
            result = await session.scalars(select(UserORM).where(UserORM.keycloak_id == id))
2✔
127
            user = result.one_or_none()
2✔
128
            if user is None:
2✔
129
                return None
2✔
130
            if user.namespace is None:
2✔
131
                raise errors.ProgrammingError(message=f"Cannot find a user namespace for user {id}.")
×
132
            return user.namespace.dump_user()
2✔
133

134
    async def get_or_create_user(self, requested_by: APIUser, id: str) -> UserInfo | None:
2✔
135
        """Get a specific user from the database and create it potentially if it does not exist.
136

137
        If the caller is the same user that is being retrieved and they are authenticated and
138
        their user information is not in the database then this call adds the user in the DB
139
        in addition to returning the user information.
140
        """
141
        async with self.session_maker() as session, session.begin():
2✔
142
            user = await self.get_user(id=id)
2✔
143
            if not user and id == requested_by.id:
2✔
144
                return await self._add_api_user(requested_by)
1✔
145
            return user
2✔
146

147
    @only_authenticated
2✔
148
    async def get_users(self, requested_by: APIUser, email: str | None = None) -> list[UserInfo]:
2✔
149
        """Get users from the database."""
150
        if not email and not requested_by.is_admin:
2✔
151
            raise errors.ForbiddenError(message="Non-admin users cannot list all users.")
1✔
152
        users = await self._get_users(email)
2✔
153

154
        is_api_user_missing = not any([requested_by.id == user.id for user in users])
2✔
155

156
        if not email and is_api_user_missing:
2✔
157
            api_user_info = await self._add_api_user(requested_by)
1✔
158
            users.append(api_user_info)
1✔
159
        return users
2✔
160

161
    async def _get_users(self, email: str | None = None) -> list[UserInfo]:
2✔
162
        async with self.session_maker() as session:
2✔
163
            stmt = select(UserORM)
2✔
164
            if email:
2✔
165
                stmt = stmt.where(UserORM.email == email)
1✔
166
            result = await session.scalars(stmt)
2✔
167
            users = result.all()
2✔
168

169
            for user in users:
2✔
170
                if user.namespace is None:
2✔
171
                    raise errors.ProgrammingError(message=f"Cannot find a user namespace for user {id}.")
×
172

173
            return [user.dump() for user in users if user.namespace is not None]
2✔
174

175
    async def get_all_users(self, requested_by: base_models.APIUser) -> AsyncGenerator[UserInfo, None]:
2✔
176
        """Get all users when reprovisioning."""
177
        if not requested_by.is_admin:
2✔
178
            raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
×
179

180
        async with self.session_maker() as session:
2✔
181
            users = await session.stream_scalars(select(UserORM))
2✔
182
            async for user in users:
2✔
183
                yield user.dump()
2✔
184

185
    async def remove_user(self, requested_by: APIUser, user_id: str) -> DeletedUser | None:
2✔
186
        """Remove a user."""
187
        logger.info(f"remove_user: Trying to remove user with ID {user_id}")
1✔
188
        return await self._remove_user(requested_by=requested_by, user_id=user_id)
1✔
189

190
    @with_db_transaction
2✔
191
    @Authz.authz_change(AuthzOperation.delete, ResourceType.user)
2✔
192
    @update_search_document
2✔
193
    async def _remove_user(
2✔
194
        self, requested_by: APIUser, user_id: str, *, session: AsyncSession | None = None
195
    ) -> DeletedUser | None:
196
        """Remove a user from the database."""
197
        if not session:
1✔
198
            raise errors.ProgrammingError(message="A database session is required")
×
199
        logger.info(f"Trying to remove user with ID {user_id}")
1✔
200
        result = await session.scalars(select(UserORM).where(UserORM.keycloak_id == user_id))
1✔
201
        user_orm = result.one_or_none()
1✔
202
        if user_orm is None:
1✔
203
            logger.info(f"User with ID {user_id} was not found.")
1✔
204
            return None
1✔
205
        await session.execute(delete(UserORM).where(UserORM.keycloak_id == user_id))
1✔
206
        logger.info(f"User with ID {user_id} was removed from the database.")
1✔
207
        logger.info(f"User namespace with ID {user_id} was removed from the authorization database.")
1✔
208
        return DeletedUser(id=user_id)
1✔
209

210
    @only_authenticated
2✔
211
    async def get_or_create_user_secret_key(self, requested_by: APIUser) -> str:
2✔
212
        """Get a user's secret encryption key or create it if it doesn't exist."""
213

214
        if self.encryption_key is None:
2✔
215
            raise errors.ConfigurationError(message="Encryption key is not set")
×
216

217
        async with self.session_maker() as session, session.begin():
2✔
218
            stmt = select(UserORM).where(UserORM.keycloak_id == requested_by.id)
2✔
219
            user = await session.scalar(stmt)
2✔
220
            if not user:
2✔
221
                raise errors.MissingResourceError(message=f"User with id {requested_by.id} not found")
×
222
            if user.secret_key is not None:
2✔
223
                return decrypt_string(self.encryption_key, user.keycloak_id, user.secret_key)
2✔
224
            # create a new secret key
225
            secret_key = secrets.token_urlsafe(32)
2✔
226
            user.secret_key = encrypt_string(self.encryption_key, user.keycloak_id, secret_key)
2✔
227
            session.add(user)
2✔
228

229
        return secret_key
2✔
230

231
    async def encrypt_user_secret(
2✔
232
        self,
233
        requested_by: base_models.APIUser,
234
        secret_service_public_key: rsa.RSAPublicKey,
235
        secret_value: str,
236
    ) -> tuple[bytes, bytes]:
237
        """Doubly encrypt a secret for a user.
238

239
        Since RSA cannot encrypt arbitrary length strings, we use symmetric encryption with a random key and encrypt the
240
        random key with RSA to get it to the secret service.
241
        """
242
        if requested_by.id is None:
2✔
243
            raise errors.ValidationError(message="APIUser has no id")
×
244

245
        user_secret_key = await self.get_or_create_user_secret_key(requested_by=requested_by)
2✔
246

247
        # encrypt once with user secret
248
        encrypted_value = encrypt_string(user_secret_key.encode(), requested_by.id, secret_value)
2✔
249
        # encrypt again with the secret service public key
250
        secret_svc_encryption_key = generate_random_encryption_key()
2✔
251
        doubly_encrypted_value = encrypt_string(secret_svc_encryption_key, requested_by.id, encrypted_value.decode())
2✔
252
        encrypted_key = encrypt_rsa(secret_service_public_key, secret_svc_encryption_key)
2✔
253
        return doubly_encrypted_value, encrypted_key
2✔
254

255

256
class UsersSync:
2✔
257
    """Sync users from Keycloak to the database."""
258

259
    def __init__(
2✔
260
        self,
261
        session_maker: Callable[..., AsyncSession],
262
        group_repo: GroupRepository,
263
        user_repo: UserRepo,
264
        metrics: MetricsService,
265
        authz: Authz,
266
    ) -> None:
267
        self.session_maker = session_maker
2✔
268
        self.group_repo = group_repo
2✔
269
        self.user_repo = user_repo
2✔
270
        self.metrics = metrics
2✔
271
        self.authz = authz
2✔
272
        self.search_updates_repo = user_repo.search_updates_repo
2✔
273

274
    async def _get_user(self, id: str) -> UserInfo | None:
2✔
275
        """Get a specific user."""
276
        async with self.session_maker() as session, session.begin():
2✔
277
            stmt = select(UserORM).where(UserORM.keycloak_id == id)
2✔
278
            res = await session.execute(stmt)
2✔
279
            orm = res.scalar_one_or_none()
2✔
280
            return orm.dump() if orm else None
2✔
281

282
    @with_db_transaction
2✔
283
    @Authz.authz_change(AuthzOperation.update_or_insert, ResourceType.user)
2✔
284
    @update_search_document
2✔
285
    async def update_or_insert_user(
2✔
286
        self, user: UnsavedUserInfo, *, session: AsyncSession | None = None
287
    ) -> UserInfoUpdate:
288
        """Update a user or insert it if it does not exist."""
289
        if not session:
2✔
290
            raise errors.ProgrammingError(message="A database session is required")
×
291
        res = await session.execute(select(UserORM).where(UserORM.keycloak_id == user.id))
2✔
292
        existing_user = res.scalar_one_or_none()
2✔
293
        if existing_user:
2✔
294
            return await self._update_user(
1✔
295
                session=session,
296
                user_id=user.id,
297
                existing_user=existing_user,
298
                patch=UserPatch.from_unsaved_user_info(user),
299
            )
300
        else:
301
            return await self._insert_user(session=session, user=user)
2✔
302

303
    async def _insert_user(self, session: AsyncSession, user: UnsavedUserInfo) -> UserInfoUpdate:
2✔
304
        """Insert a user."""
305
        slug = base_models.Slug.from_user(user.email, user.first_name, user.last_name, user.id).value
2✔
306
        namespace = await self.group_repo._create_user_namespace_slug(
2✔
307
            session, user_slug=slug, retry_enumerate=5, retry_random=True
308
        )
309
        slug = base_models.Slug.from_name(namespace)
2✔
310
        new_user = UserORM(
2✔
311
            keycloak_id=user.id,
312
            namespace=NamespaceORM(slug=slug.value, user_id=user.id),
313
            email=user.email,
314
            first_name=user.first_name,
315
            last_name=user.last_name,
316
        )
317
        new_user.namespace.user = new_user
2✔
318
        session.add(new_user)
2✔
319
        await session.flush()
2✔
320

321
        # Send the new user's identity for metrics
322
        result = new_user.dump()
2✔
323
        user_identity = await self.metrics.identify_user(user=result, existing_identity_hash=None, metadata={})
2✔
324
        # NOTE: We check againt the UserIdentity class because of magic mocks in tests
325
        if isinstance(user_identity, UserIdentity):
2✔
NEW
326
            await session.refresh(new_user)
×
NEW
327
            user_metrics_res = await session.scalars(select(UserMetricsORM).where(UserMetricsORM.id == new_user.id))
×
NEW
328
            user_metrics_orm = user_metrics_res.one_or_none()
×
NEW
329
            if user_metrics_orm is None:
×
NEW
330
                user_metrics_orm = UserMetricsORM(id=new_user.id)
×
NEW
331
                session.add(user_metrics_orm)
×
NEW
332
            user_metrics_orm.metrics_identity_hash = user_identity.hash()
×
NEW
333
            await session.flush()
×
334

335
        return UserInfoUpdate(None, result)
2✔
336

337
    async def _update_user(
2✔
338
        self, session: AsyncSession, user_id: str, existing_user: UserORM | None, patch: UserPatch
339
    ) -> UserInfoUpdate:
340
        """Update a user."""
341
        if not existing_user:
1✔
342
            async with self.session_maker() as session, session.begin():
×
343
                res = await session.execute(select(UserORM).where(UserORM.keycloak_id == user_id))
×
344
                existing_user = res.scalar_one_or_none()
×
345
        if not existing_user:
1✔
346
            raise errors.MissingResourceError(message=f"The user with id '{user_id}' cannot be found")
×
347
        old_user = existing_user.dump()
1✔
348
        session.add(existing_user)  # reattach to session
1✔
349
        if patch.email is not None:
1✔
350
            existing_user.email = patch.email if patch.email else None
1✔
351
        if patch.first_name is not None:
1✔
352
            existing_user.first_name = patch.first_name if patch.first_name else None
1✔
353
        if patch.last_name is not None:
1✔
354
            existing_user.last_name = patch.last_name if patch.last_name else None
1✔
355
        namespace = await self.group_repo.get_user_namespace(user_id)
1✔
356
        if not namespace:
1✔
357
            raise errors.ProgrammingError(
×
358
                message=f"Cannot find a user namespace for user {user_id} when updating the user."
359
            )
360

361
        # Send the updated user's identity for metrics
362
        result = existing_user.dump()
1✔
363
        user_metrics_res = await session.scalars(select(UserMetricsORM).where(UserMetricsORM.id == existing_user.id))
1✔
364
        user_metrics_orm = user_metrics_res.one_or_none()
1✔
365
        metrics_identity_hash = user_metrics_orm.metrics_identity_hash if user_metrics_orm is not None else None
1✔
366
        user_identity = await self.metrics.identify_user(
1✔
367
            user=result, existing_identity_hash=metrics_identity_hash, metadata={}
368
        )
369
        # NOTE: We check againt the UserIdentity class because of magic mocks in tests
370
        if isinstance(user_identity, UserIdentity):
1✔
NEW
371
            if user_metrics_orm is None:
×
NEW
372
                user_metrics_orm = UserMetricsORM(id=existing_user.id)
×
NEW
373
                session.add(user_metrics_orm)
×
NEW
374
            user_metrics_orm.metrics_identity_hash = user_identity.hash()
×
NEW
375
            await session.flush()
×
376

377
        return UserInfoUpdate(old_user, existing_user.dump())
1✔
378

379
    async def users_sync(self, kc_api: IKeycloakAPI) -> None:
2✔
380
        """Sync all users from Keycloak into the users database.
381

382
        This method also updates the users' data stored for product metrics.
383
        """
384
        logger.info("Starting a total user database sync.")
2✔
385
        kc_users = kc_api.get_users()
2✔
386

387
        async def _do_update(raw_kc_user: dict[str, Any]) -> None:
2✔
388
            kc_user = UnsavedUserInfo.from_kc_user_payload(raw_kc_user)
2✔
389
            logger.info(f"Checking user with Keycloak ID {kc_user.id}")
2✔
390
            db_user = await self._get_user(kc_user.id)
2✔
391
            if db_user is None or db_user.requires_update(current_user_info=kc_user):
2✔
392
                logger.info(f"Inserting or updating user {db_user} -> {kc_user}")
2✔
393
                update = await self.update_or_insert_user(user=kc_user)
2✔
394
                db_user = update.new
2✔
395

396
        # NOTE: If asyncio.gather is used here you quickly exhaust all DB connections
397
        # or timeout on waiting for available connections
398
        for user in kc_users:
2✔
399
            await _do_update(user)
2✔
400

401
    async def events_sync(self, kc_api: IKeycloakAPI) -> None:
2✔
402
        """Use the events from Keycloak to update the users database."""
403
        async with self.session_maker() as session, session.begin():
1✔
404
            res_count = await session.execute(select(func.count()).select_from(UserORM))
1✔
405
            count = res_count.scalar() or 0
1✔
406
            if count == 0:
1✔
407
                await self.users_sync(kc_api)
×
408
            logger.info("Starting periodic event sync.")
1✔
409
            stmt = select(LastKeycloakEventTimestamp)
1✔
410
            latest_utc_timestamp_orm = (await session.execute(stmt)).scalar_one_or_none()
1✔
411
            previous_sync_latest_utc_timestamp = (
1✔
412
                latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm is not None else None
413
            )
414
            logger.info(f"The previous sync latest event is {previous_sync_latest_utc_timestamp} UTC")
1✔
415
            now_utc = datetime.now(tz=UTC)
1✔
416
            start_date = now_utc.date() - timedelta(days=1)
1✔
417
            logger.info(f"Pulling events with a start date of {start_date} UTC")
1✔
418
            user_events = kc_api.get_user_events(start_date=start_date)
1✔
419
            update_admin_events = kc_api.get_admin_events(
1✔
420
                start_date=start_date, event_types=[KeycloakAdminEvent.CREATE, KeycloakAdminEvent.UPDATE]
421
            )
422
            delete_admin_events = kc_api.get_admin_events(
1✔
423
                start_date=start_date, event_types=[KeycloakAdminEvent.DELETE]
424
            )
425
            parsed_updates = UserInfoFieldUpdate.from_json_admin_events(update_admin_events)
1✔
426
            parsed_updates.extend(UserInfoFieldUpdate.from_json_user_events(user_events))
1✔
427
            parsed_deletions = UserInfoFieldUpdate.from_json_admin_events(delete_admin_events)
1✔
428
            parsed_updates = sorted(parsed_updates, key=lambda x: x.timestamp_utc)
1✔
429
            parsed_deletions = sorted(parsed_deletions, key=lambda x: x.timestamp_utc)
1✔
430
            if previous_sync_latest_utc_timestamp is not None:
1✔
431
                # Some events have already been processed - filter out old events we have seen
432
                logger.info(f"Filtering events older than {previous_sync_latest_utc_timestamp}")
1✔
433
                parsed_updates = [u for u in parsed_updates if u.timestamp_utc > previous_sync_latest_utc_timestamp]
1✔
434
                parsed_deletions = [u for u in parsed_deletions if u.timestamp_utc > previous_sync_latest_utc_timestamp]
1✔
435
            latest_update_timestamp = None
1✔
436
            latest_delete_timestamp = None
1✔
437
            for update in parsed_updates:
1✔
438
                logger.info(f"Processing update event {update}")
1✔
439
                # TODO: add typing to `update.field_name` for safer updates
440
                await self.update_or_insert_user(
1✔
441
                    user=UnsavedUserInfo(id=update.user_id, **{update.field_name: update.new_value})
442
                )
443
                latest_update_timestamp = update.timestamp_utc
1✔
444
            for deletion in parsed_deletions:
1✔
445
                logger.info(f"Processing deletion event {deletion}")
1✔
446
                await self.user_repo.remove_user(
1✔
447
                    requested_by=InternalServiceAdmin(id=ServiceAdminId.migrations), user_id=deletion.user_id
448
                )
449
                latest_delete_timestamp = deletion.timestamp_utc
1✔
450
            # Update the latest processed event timestamp
451
            current_sync_latest_utc_timestamp = latest_update_timestamp
1✔
452
            if latest_delete_timestamp is not None and (
1✔
453
                current_sync_latest_utc_timestamp is None or current_sync_latest_utc_timestamp < latest_delete_timestamp
454
            ):
455
                current_sync_latest_utc_timestamp = latest_delete_timestamp
1✔
456
            if current_sync_latest_utc_timestamp is not None:
1✔
457
                if latest_utc_timestamp_orm is None:
1✔
458
                    session.add(LastKeycloakEventTimestamp(current_sync_latest_utc_timestamp))
1✔
459
                    logger.info(
1✔
460
                        f"Inserted the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
461
                    )
462
                else:
463
                    latest_utc_timestamp_orm.timestamp_utc = current_sync_latest_utc_timestamp
1✔
464
                    logger.info(
1✔
465
                        f"Updated the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
466
                    )
467

468

469
@dataclass
2✔
470
class UserPreferencesRepository:
2✔
471
    """Repository for user preferences."""
472

473
    session_maker: Callable[..., AsyncSession]
2✔
474
    user_preferences_config: UserPreferencesConfig
2✔
475

476
    @only_authenticated
2✔
477
    async def get_user_preferences(
2✔
478
        self,
479
        requested_by: APIUser,
480
    ) -> UserPreferences:
481
        """Get user preferences from the database."""
482
        async with self.session_maker() as session:
2✔
483
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
484
            user_preferences = res.one_or_none()
2✔
485

486
            if user_preferences is None:
2✔
487
                raise errors.MissingResourceError(message="Preferences not found for user.")
1✔
488
            return user_preferences.dump()
1✔
489

490
    @only_authenticated
2✔
491
    async def delete_user_preferences(self, requested_by: APIUser) -> None:
2✔
492
        """Delete user preferences from the database."""
493
        async with self.session_maker() as session, session.begin():
1✔
494
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
1✔
495
            user_preferences = res.one_or_none()
1✔
496

497
            if user_preferences is None:
1✔
498
                return
×
499

500
            await session.delete(user_preferences)
1✔
501

502
    @only_authenticated
2✔
503
    async def add_pinned_project(self, requested_by: APIUser, project_slug: str) -> UserPreferences:
2✔
504
        """Adds a new pinned project to the user's preferences."""
505
        async with self.session_maker() as session, session.begin():
2✔
506
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
507
            user_preferences = res.one_or_none()
2✔
508

509
            if user_preferences is None:
2✔
510
                new_preferences = UserPreferences(
1✔
511
                    user_id=cast(str, requested_by.id),
512
                    pinned_projects=PinnedProjects(project_slugs=[project_slug]),
513
                )
514
                user_preferences = UserPreferencesORM.load(new_preferences)
1✔
515
                session.add(user_preferences)
1✔
516
                return user_preferences.dump()
1✔
517

518
            project_slugs: list[str]
519
            project_slugs = user_preferences.pinned_projects.get("project_slugs", [])
2✔
520

521
            # Do nothing if the project is already listed
522
            for slug in project_slugs:
2✔
523
                if project_slug.lower() == slug.lower():
2✔
524
                    return user_preferences.dump()
1✔
525

526
            # Check if we have reached the maximum number of pins
527
            if (
2✔
528
                self.user_preferences_config.max_pinned_projects > 0
529
                and len(project_slugs) >= self.user_preferences_config.max_pinned_projects
530
            ):
531
                raise errors.ValidationError(
1✔
532
                    message="Maximum number of pinned projects already allocated"
533
                    + f" (limit: {self.user_preferences_config.max_pinned_projects}, current: {len(project_slugs)})"
534
                )
535

536
            new_project_slugs = list(project_slugs) + [project_slug]
2✔
537
            pinned_projects = PinnedProjects(project_slugs=new_project_slugs).model_dump()
2✔
538
            user_preferences.pinned_projects = pinned_projects
2✔
539
            return user_preferences.dump()
2✔
540

541
    @only_authenticated
2✔
542
    async def remove_pinned_project(self, requested_by: APIUser, project_slug: str) -> UserPreferences:
2✔
543
        """Removes on or all pinned projects from the user's preferences."""
544
        async with self.session_maker() as session, session.begin():
2✔
545
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
546
            user_preferences = res.one_or_none()
2✔
547

548
            if user_preferences is None:
2✔
549
                raise errors.MissingResourceError(message="Preferences not found for user.")
×
550

551
            project_slugs: list[str]
552
            project_slugs = user_preferences.pinned_projects.get("project_slugs", [])
2✔
553

554
            # Remove all projects if `project_slug` is None
555
            new_project_slugs = (
2✔
556
                [slug for slug in project_slugs if project_slug.lower() != slug.lower()] if project_slug else []
557
            )
558

559
            pinned_projects = PinnedProjects(project_slugs=new_project_slugs).model_dump()
2✔
560
            user_preferences.pinned_projects = pinned_projects
2✔
561
            return user_preferences.dump()
2✔
562

563
    @only_authenticated
2✔
564
    async def add_dismiss_project_migration_banner(self, requested_by: base_models.APIUser) -> UserPreferences:
2✔
565
        """Set the dismiss project migration banner to true."""
566
        async with self.session_maker() as session, session.begin():
2✔
567
            result = await session.scalars(
2✔
568
                select(UserPreferencesORM).where(UserPreferencesORM.user_id == cast(str, requested_by.id))
569
            )
570
            user_preferences_orm = result.one_or_none()
2✔
571
            if user_preferences_orm is None:
2✔
572
                user_preferences_orm = UserPreferencesORM(
2✔
573
                    user_id=cast(str, requested_by.id),
574
                    pinned_projects={"project_slugs": []},
575
                    show_project_migration_banner=False,
576
                )
577
                session.add(user_preferences_orm)
2✔
578
            else:
579
                user_preferences_orm.show_project_migration_banner = False
×
580

581
            await session.flush()
2✔
582
            await session.refresh(user_preferences_orm)
2✔
583
            return user_preferences_orm.dump()
2✔
584

585
    @only_authenticated
2✔
586
    async def remove_dismiss_project_migration_banner(self, requested_by: APIUser) -> UserPreferences:
2✔
587
        """Removes dismiss project migration banner from the user's preferences."""
588
        async with self.session_maker() as session, session.begin():
2✔
589
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
590
            user_preferences = res.one_or_none()
2✔
591

592
            if user_preferences is None:
2✔
593
                raise errors.MissingResourceError(message="Preferences not found for user.", quiet=True)
×
594

595
            user_preferences.show_project_migration_banner = True
2✔
596
            return user_preferences.dump()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc