• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 15844898279

24 Jun 2025 08:06AM UTC coverage: 86.98% (-0.02%) from 86.999%
15844898279

Pull #903

github

web-flow
Merge 62759d788 into 4f5366ae7
Pull Request #903: feat: update to rclone v1.70.0+renku-1

2 of 2 new or added lines in 2 files covered. (100.0%)

6 existing lines in 4 files now uncovered.

21859 of 25131 relevant lines covered (86.98%)

1.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.93
/components/renku_data_services/users/db.py
1
"""Database adapters and helpers for users."""
2

3
import secrets
2✔
4
from collections.abc import AsyncGenerator, Callable
2✔
5
from dataclasses import dataclass, field
2✔
6
from datetime import UTC, datetime, timedelta
2✔
7
from typing import Any, cast
2✔
8

9
from sqlalchemy import delete, func, select
2✔
10
from sqlalchemy.ext.asyncio import AsyncSession
2✔
11

12
from renku_data_services import base_models
2✔
13
from renku_data_services.app_config import logging
2✔
14
from renku_data_services.authz.authz import Authz, AuthzOperation, ResourceType
2✔
15
from renku_data_services.base_api.auth import APIUser, only_authenticated
2✔
16
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
2✔
17
from renku_data_services.errors import errors
2✔
18
from renku_data_services.message_queue import events
2✔
19
from renku_data_services.message_queue.avro_models.io.renku.events import v2 as avro_schema_v2
2✔
20
from renku_data_services.message_queue.db import EventRepository
2✔
21
from renku_data_services.message_queue.interface import IMessageQueue
2✔
22
from renku_data_services.message_queue.redis_queue import dispatch_message
2✔
23
from renku_data_services.namespace.db import GroupRepository
2✔
24
from renku_data_services.namespace.orm import NamespaceORM
2✔
25
from renku_data_services.search.db import SearchUpdatesRepo
2✔
26
from renku_data_services.search.decorators import update_search_document
2✔
27
from renku_data_services.users.config import UserPreferencesConfig
2✔
28
from renku_data_services.users.kc_api import IKeycloakAPI
2✔
29
from renku_data_services.users.models import (
2✔
30
    DeletedUser,
31
    KeycloakAdminEvent,
32
    PinnedProjects,
33
    UnsavedUserInfo,
34
    UserInfo,
35
    UserInfoFieldUpdate,
36
    UserInfoUpdate,
37
    UserPatch,
38
    UserPreferences,
39
)
40
from renku_data_services.users.orm import LastKeycloakEventTimestamp, UserORM, UserPreferencesORM
2✔
41
from renku_data_services.utils.core import with_db_transaction
2✔
42
from renku_data_services.utils.cryptography import decrypt_string, encrypt_string
2✔
43

44
logger = logging.getLogger(__name__)
2✔
45

46

47
@dataclass
2✔
48
class UserRepo:
2✔
49
    """An adapter for accessing users from the database."""
50

51
    session_maker: Callable[..., AsyncSession]
2✔
52
    message_queue: IMessageQueue
2✔
53
    event_repo: EventRepository
2✔
54
    group_repo: GroupRepository
2✔
55
    search_updates_repo: SearchUpdatesRepo
2✔
56
    encryption_key: bytes | None = field(repr=False)
2✔
57
    authz: Authz
2✔
58

59
    def __post_init__(self) -> None:
2✔
60
        self._users_sync = UsersSync(
2✔
61
            self.session_maker, self.message_queue, self.event_repo, self.group_repo, self, self.authz
62
        )
63

64
    async def initialize(self, kc_api: IKeycloakAPI) -> None:
2✔
65
        """Do a total sync of users from Keycloak if there is nothing in the DB."""
66
        users = await self._get_users()
2✔
67
        if len(users) > 0:
2✔
68
            return
1✔
69
        await self._users_sync.users_sync(kc_api)
2✔
70

71
    async def _add_api_user(self, user: APIUser) -> UserInfo:
2✔
72
        if not user.id:
1✔
73
            raise errors.UnauthorizedError(message="The user has to be authenticated to be inserted in the DB.")
×
74
        result = await self._users_sync.update_or_insert_user(
1✔
75
            user=UnsavedUserInfo(
76
                id=user.id,
77
                email=user.email,
78
                first_name=user.first_name,
79
                last_name=user.last_name,
80
            )
81
        )
82
        return result.new
1✔
83

84
    async def get_user(self, id: str) -> UserInfo | None:
2✔
85
        """Get a specific user from the database."""
86
        async with self.session_maker() as session:
2✔
87
            result = await session.scalars(select(UserORM).where(UserORM.keycloak_id == id))
2✔
88
            user = result.one_or_none()
2✔
89
            if user is None:
2✔
90
                return None
2✔
91
            if user.namespace is None:
2✔
92
                raise errors.ProgrammingError(message=f"Cannot find a user namespace for user {id}.")
×
93
            return user.namespace.dump_user()
2✔
94

95
    async def get_or_create_user(self, requested_by: APIUser, id: str) -> UserInfo | None:
2✔
96
        """Get a specific user from the database and create it potentially if it does not exist.
97

98
        If the caller is the same user that is being retrieved and they are authenticated and
99
        their user information is not in the database then this call adds the user in the DB
100
        in addition to returning the user information.
101
        """
102
        async with self.session_maker() as session, session.begin():
2✔
103
            user = await self.get_user(id=id)
2✔
104
            if not user and id == requested_by.id:
2✔
105
                return await self._add_api_user(requested_by)
1✔
106
            return user
2✔
107

108
    @only_authenticated
2✔
109
    async def get_users(self, requested_by: APIUser, email: str | None = None) -> list[UserInfo]:
2✔
110
        """Get users from the database."""
111
        if not email and not requested_by.is_admin:
2✔
112
            raise errors.ForbiddenError(message="Non-admin users cannot list all users.")
1✔
113
        users = await self._get_users(email)
2✔
114

115
        is_api_user_missing = not any([requested_by.id == user.id for user in users])
2✔
116

117
        if not email and is_api_user_missing:
2✔
118
            api_user_info = await self._add_api_user(requested_by)
1✔
119
            users.append(api_user_info)
1✔
120
        return users
2✔
121

122
    async def _get_users(self, email: str | None = None) -> list[UserInfo]:
2✔
123
        async with self.session_maker() as session:
2✔
124
            stmt = select(UserORM)
2✔
125
            if email:
2✔
UNCOV
126
                stmt = stmt.where(UserORM.email == email)
×
127
            result = await session.scalars(stmt)
2✔
128
            users = result.all()
2✔
129

130
            for user in users:
2✔
131
                if user.namespace is None:
2✔
132
                    raise errors.ProgrammingError(message=f"Cannot find a user namespace for user {id}.")
×
133

134
            return [user.dump() for user in users if user.namespace is not None]
2✔
135

136
    async def get_all_users(self, requested_by: base_models.APIUser) -> AsyncGenerator[UserInfo, None]:
2✔
137
        """Get all users when reprovisioning."""
138
        if not requested_by.is_admin:
2✔
139
            raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
×
140

141
        async with self.session_maker() as session:
2✔
142
            users = await session.stream_scalars(select(UserORM))
2✔
143
            async for user in users:
2✔
144
                yield user.dump()
2✔
145

146
    async def remove_user(self, requested_by: APIUser, user_id: str) -> DeletedUser | None:
2✔
147
        """Remove a user."""
148
        logger.info(f"remove_user: Trying to remove user with ID {user_id}")
1✔
149
        return await self._remove_user(requested_by=requested_by, user_id=user_id)
1✔
150

151
    @with_db_transaction
2✔
152
    @Authz.authz_change(AuthzOperation.delete, ResourceType.user)
2✔
153
    @dispatch_message(avro_schema_v2.UserRemoved)
2✔
154
    @update_search_document
2✔
155
    async def _remove_user(
2✔
156
        self, requested_by: APIUser, user_id: str, *, session: AsyncSession | None = None
157
    ) -> DeletedUser | None:
158
        """Remove a user from the database."""
159
        if not session:
1✔
160
            raise errors.ProgrammingError(message="A database session is required")
×
161
        logger.info(f"Trying to remove user with ID {user_id}")
1✔
162
        result = await session.scalars(select(UserORM).where(UserORM.keycloak_id == user_id))
1✔
163
        user_orm = result.one_or_none()
1✔
164
        if user_orm is None:
1✔
165
            logger.info(f"User with ID {user_id} was not found.")
1✔
166
            return None
1✔
167
        await session.execute(delete(UserORM).where(UserORM.keycloak_id == user_id))
1✔
168
        logger.info(f"User with ID {user_id} was removed from the database.")
1✔
169
        logger.info(f"User namespace with ID {user_id} was removed from the authorization database.")
1✔
170
        return DeletedUser(id=user_id)
1✔
171

172
    @only_authenticated
2✔
173
    async def get_or_create_user_secret_key(self, requested_by: APIUser) -> str:
2✔
174
        """Get a user's secret encryption key or create it if it doesn't exist."""
175

176
        if self.encryption_key is None:
2✔
177
            raise errors.ConfigurationError(message="Encryption key is not set")
×
178

179
        async with self.session_maker() as session, session.begin():
2✔
180
            stmt = select(UserORM).where(UserORM.keycloak_id == requested_by.id)
2✔
181
            user = await session.scalar(stmt)
2✔
182
            if not user:
2✔
183
                raise errors.MissingResourceError(message=f"User with id {requested_by.id} not found")
×
184
            if user.secret_key is not None:
2✔
185
                return decrypt_string(self.encryption_key, user.keycloak_id, user.secret_key)
2✔
186
            # create a new secret key
187
            secret_key = secrets.token_urlsafe(32)
2✔
188
            user.secret_key = encrypt_string(self.encryption_key, user.keycloak_id, secret_key)
2✔
189
            session.add(user)
2✔
190

191
        return secret_key
2✔
192

193

194
class UsersSync:
2✔
195
    """Sync users from Keycloak to the database."""
196

197
    def __init__(
2✔
198
        self,
199
        session_maker: Callable[..., AsyncSession],
200
        message_queue: IMessageQueue,
201
        event_repo: EventRepository,
202
        group_repo: GroupRepository,
203
        user_repo: UserRepo,
204
        authz: Authz,
205
    ) -> None:
206
        self.session_maker = session_maker
2✔
207
        self.message_queue: IMessageQueue = message_queue
2✔
208
        self.event_repo: EventRepository = event_repo
2✔
209
        self.group_repo = group_repo
2✔
210
        self.user_repo = user_repo
2✔
211
        self.authz = authz
2✔
212
        self.search_updates_repo = user_repo.search_updates_repo
2✔
213

214
    async def _get_user(self, id: str) -> UserInfo | None:
2✔
215
        """Get a specific user."""
216
        async with self.session_maker() as session, session.begin():
2✔
217
            stmt = select(UserORM).where(UserORM.keycloak_id == id)
2✔
218
            res = await session.execute(stmt)
2✔
219
            orm = res.scalar_one_or_none()
2✔
220
            return orm.dump() if orm else None
2✔
221

222
    @with_db_transaction
2✔
223
    @Authz.authz_change(AuthzOperation.update_or_insert, ResourceType.user)
2✔
224
    @update_search_document
2✔
225
    @dispatch_message(events.UpdateOrInsertUser)
2✔
226
    async def update_or_insert_user(
2✔
227
        self, user: UnsavedUserInfo, *, session: AsyncSession | None = None
228
    ) -> UserInfoUpdate:
229
        """Update a user or insert it if it does not exist."""
230
        if not session:
2✔
231
            raise errors.ProgrammingError(message="A database session is required")
×
232
        res = await session.execute(select(UserORM).where(UserORM.keycloak_id == user.id))
2✔
233
        existing_user = res.scalar_one_or_none()
2✔
234
        if existing_user:
2✔
235
            return await self._update_user(
1✔
236
                session=session,
237
                user_id=user.id,
238
                existing_user=existing_user,
239
                patch=UserPatch.from_unsaved_user_info(user),
240
            )
241
        else:
242
            return await self._insert_user(session=session, user=user)
2✔
243

244
    async def _insert_user(self, session: AsyncSession, user: UnsavedUserInfo) -> UserInfoUpdate:
2✔
245
        """Insert a user."""
246
        slug = base_models.Slug.from_user(user.email, user.first_name, user.last_name, user.id).value
2✔
247
        namespace = await self.group_repo._create_user_namespace_slug(
2✔
248
            session, user_slug=slug, retry_enumerate=5, retry_random=True
249
        )
250
        slug = base_models.Slug.from_name(namespace)
2✔
251
        new_user = UserORM(
2✔
252
            keycloak_id=user.id,
253
            namespace=NamespaceORM(slug=slug.value, user_id=user.id),
254
            email=user.email,
255
            first_name=user.first_name,
256
            last_name=user.last_name,
257
        )
258
        new_user.namespace.user = new_user
2✔
259
        session.add(new_user)
2✔
260
        await session.flush()
2✔
261
        return UserInfoUpdate(None, new_user.dump())
2✔
262

263
    async def _update_user(
2✔
264
        self, session: AsyncSession, user_id: str, existing_user: UserORM | None, patch: UserPatch
265
    ) -> UserInfoUpdate:
266
        """Update a user."""
267
        if not existing_user:
1✔
268
            async with self.session_maker() as session, session.begin():
×
269
                res = await session.execute(select(UserORM).where(UserORM.keycloak_id == user_id))
×
270
                existing_user = res.scalar_one_or_none()
×
271
        if not existing_user:
1✔
272
            raise errors.MissingResourceError(message=f"The user with id '{user_id}' cannot be found")
×
273
        old_user = existing_user.dump()
1✔
274
        session.add(existing_user)  # reattach to session
1✔
275
        if patch.email is not None:
1✔
276
            existing_user.email = patch.email if patch.email else None
1✔
277
        if patch.first_name is not None:
1✔
278
            existing_user.first_name = patch.first_name if patch.first_name else None
1✔
279
        if patch.last_name is not None:
1✔
280
            existing_user.last_name = patch.last_name if patch.last_name else None
1✔
281
        namespace = await self.group_repo.get_user_namespace(user_id)
1✔
282
        if not namespace:
1✔
283
            raise errors.ProgrammingError(
×
284
                message=f"Cannot find a user namespace for user {user_id} when updating the user."
285
            )
286
        return UserInfoUpdate(old_user, existing_user.dump())
1✔
287

288
    async def users_sync(self, kc_api: IKeycloakAPI) -> None:
2✔
289
        """Sync all users from Keycloak into the users database."""
290
        logger.info("Starting a total user database sync.")
2✔
291
        kc_users = kc_api.get_users()
2✔
292

293
        async def _do_update(raw_kc_user: dict[str, Any]) -> None:
2✔
294
            kc_user = UserInfo.from_kc_user_payload(raw_kc_user)
2✔
295
            logger.info(f"Checking user with Keycloak ID {kc_user.id}")
2✔
296
            db_user = await self._get_user(kc_user.id)
2✔
297
            if db_user != kc_user:
2✔
298
                logger.info(f"Inserting or updating user {db_user} -> {kc_user}")
2✔
299
                await self.update_or_insert_user(user=kc_user)
2✔
300

301
        # NOTE: If asyncio.gather is used here you quickly exhaust all DB connections
302
        # or timeout on waiting for available connections
303
        for user in kc_users:
2✔
304
            await _do_update(user)
2✔
305

306
    async def events_sync(self, kc_api: IKeycloakAPI) -> None:
2✔
307
        """Use the events from Keycloak to update the users database."""
308
        async with self.session_maker() as session, session.begin():
1✔
309
            res_count = await session.execute(select(func.count()).select_from(UserORM))
1✔
310
            count = res_count.scalar() or 0
1✔
311
            if count == 0:
1✔
312
                await self.users_sync(kc_api)
×
313
            logger.info("Starting periodic event sync.")
1✔
314
            stmt = select(LastKeycloakEventTimestamp)
1✔
315
            latest_utc_timestamp_orm = (await session.execute(stmt)).scalar_one_or_none()
1✔
316
            previous_sync_latest_utc_timestamp = (
1✔
317
                latest_utc_timestamp_orm.timestamp_utc if latest_utc_timestamp_orm is not None else None
318
            )
319
            logger.info(f"The previous sync latest event is {previous_sync_latest_utc_timestamp} UTC")
1✔
320
            now_utc = datetime.now(tz=UTC)
1✔
321
            start_date = now_utc.date() - timedelta(days=1)
1✔
322
            logger.info(f"Pulling events with a start date of {start_date} UTC")
1✔
323
            user_events = kc_api.get_user_events(start_date=start_date)
1✔
324
            update_admin_events = kc_api.get_admin_events(
1✔
325
                start_date=start_date, event_types=[KeycloakAdminEvent.CREATE, KeycloakAdminEvent.UPDATE]
326
            )
327
            delete_admin_events = kc_api.get_admin_events(
1✔
328
                start_date=start_date, event_types=[KeycloakAdminEvent.DELETE]
329
            )
330
            parsed_updates = UserInfoFieldUpdate.from_json_admin_events(update_admin_events)
1✔
331
            parsed_updates.extend(UserInfoFieldUpdate.from_json_user_events(user_events))
1✔
332
            parsed_deletions = UserInfoFieldUpdate.from_json_admin_events(delete_admin_events)
1✔
333
            parsed_updates = sorted(parsed_updates, key=lambda x: x.timestamp_utc)
1✔
334
            parsed_deletions = sorted(parsed_deletions, key=lambda x: x.timestamp_utc)
1✔
335
            if previous_sync_latest_utc_timestamp is not None:
1✔
336
                # Some events have already been processed - filter out old events we have seen
337
                logger.info(f"Filtering events older than {previous_sync_latest_utc_timestamp}")
1✔
338
                parsed_updates = [u for u in parsed_updates if u.timestamp_utc > previous_sync_latest_utc_timestamp]
1✔
339
                parsed_deletions = [u for u in parsed_deletions if u.timestamp_utc > previous_sync_latest_utc_timestamp]
1✔
340
            latest_update_timestamp = None
1✔
341
            latest_delete_timestamp = None
1✔
342
            for update in parsed_updates:
1✔
343
                logger.info(f"Processing update event {update}")
1✔
344
                # TODO: add typing to `update.field_name` for safer updates
345
                await self.update_or_insert_user(
1✔
346
                    user=UnsavedUserInfo(id=update.user_id, **{update.field_name: update.new_value})
347
                )
348
                latest_update_timestamp = update.timestamp_utc
1✔
349
            for deletion in parsed_deletions:
1✔
350
                logger.info(f"Processing deletion event {deletion}")
1✔
351
                await self.user_repo.remove_user(
1✔
352
                    requested_by=InternalServiceAdmin(id=ServiceAdminId.migrations), user_id=deletion.user_id
353
                )
354
                latest_delete_timestamp = deletion.timestamp_utc
1✔
355
            # Update the latest processed event timestamp
356
            current_sync_latest_utc_timestamp = latest_update_timestamp
1✔
357
            if latest_delete_timestamp is not None and (
1✔
358
                current_sync_latest_utc_timestamp is None or current_sync_latest_utc_timestamp < latest_delete_timestamp
359
            ):
360
                current_sync_latest_utc_timestamp = latest_delete_timestamp
1✔
361
            if current_sync_latest_utc_timestamp is not None:
1✔
362
                if latest_utc_timestamp_orm is None:
1✔
363
                    session.add(LastKeycloakEventTimestamp(current_sync_latest_utc_timestamp))
1✔
364
                    logger.info(
1✔
365
                        f"Inserted the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
366
                    )
367
                else:
368
                    latest_utc_timestamp_orm.timestamp_utc = current_sync_latest_utc_timestamp
1✔
369
                    logger.info(
1✔
370
                        f"Updated the latest sync event timestamp in the database: {current_sync_latest_utc_timestamp}"
371
                    )
372

373

374
@dataclass
2✔
375
class UserPreferencesRepository:
2✔
376
    """Repository for user preferences."""
377

378
    session_maker: Callable[..., AsyncSession]
2✔
379
    user_preferences_config: UserPreferencesConfig
2✔
380

381
    @only_authenticated
2✔
382
    async def get_user_preferences(
2✔
383
        self,
384
        requested_by: APIUser,
385
    ) -> UserPreferences:
386
        """Get user preferences from the database."""
387
        async with self.session_maker() as session:
2✔
388
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
389
            user_preferences = res.one_or_none()
2✔
390

391
            if user_preferences is None:
2✔
392
                raise errors.MissingResourceError(message="Preferences not found for user.")
1✔
393
            return user_preferences.dump()
1✔
394

395
    @only_authenticated
2✔
396
    async def delete_user_preferences(self, requested_by: APIUser) -> None:
2✔
397
        """Delete user preferences from the database."""
398
        async with self.session_maker() as session, session.begin():
1✔
399
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
1✔
400
            user_preferences = res.one_or_none()
1✔
401

402
            if user_preferences is None:
1✔
403
                return
×
404

405
            await session.delete(user_preferences)
1✔
406

407
    @only_authenticated
2✔
408
    async def add_pinned_project(self, requested_by: APIUser, project_slug: str) -> UserPreferences:
2✔
409
        """Adds a new pinned project to the user's preferences."""
410
        async with self.session_maker() as session, session.begin():
2✔
411
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
412
            user_preferences = res.one_or_none()
2✔
413

414
            if user_preferences is None:
2✔
415
                new_preferences = UserPreferences(
1✔
416
                    user_id=cast(str, requested_by.id),
417
                    pinned_projects=PinnedProjects(project_slugs=[project_slug]),
418
                )
419
                user_preferences = UserPreferencesORM.load(new_preferences)
1✔
420
                session.add(user_preferences)
1✔
421
                return user_preferences.dump()
1✔
422

423
            project_slugs: list[str]
424
            project_slugs = user_preferences.pinned_projects.get("project_slugs", [])
2✔
425

426
            # Do nothing if the project is already listed
427
            for slug in project_slugs:
2✔
428
                if project_slug.lower() == slug.lower():
2✔
429
                    return user_preferences.dump()
1✔
430

431
            # Check if we have reached the maximum number of pins
432
            if (
2✔
433
                self.user_preferences_config.max_pinned_projects > 0
434
                and len(project_slugs) >= self.user_preferences_config.max_pinned_projects
435
            ):
436
                raise errors.ValidationError(
1✔
437
                    message="Maximum number of pinned projects already allocated"
438
                    + f" (limit: {self.user_preferences_config.max_pinned_projects}, current: {len(project_slugs)})"
439
                )
440

441
            new_project_slugs = list(project_slugs) + [project_slug]
2✔
442
            pinned_projects = PinnedProjects(project_slugs=new_project_slugs).model_dump()
2✔
443
            user_preferences.pinned_projects = pinned_projects
2✔
444
            return user_preferences.dump()
2✔
445

446
    @only_authenticated
2✔
447
    async def remove_pinned_project(self, requested_by: APIUser, project_slug: str) -> UserPreferences:
2✔
448
        """Removes on or all pinned projects from the user's preferences."""
449
        async with self.session_maker() as session, session.begin():
2✔
450
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
451
            user_preferences = res.one_or_none()
2✔
452

453
            if user_preferences is None:
2✔
454
                raise errors.MissingResourceError(message="Preferences not found for user.")
×
455

456
            project_slugs: list[str]
457
            project_slugs = user_preferences.pinned_projects.get("project_slugs", [])
2✔
458

459
            # Remove all projects if `project_slug` is None
460
            new_project_slugs = (
2✔
461
                [slug for slug in project_slugs if project_slug.lower() != slug.lower()] if project_slug else []
462
            )
463

464
            pinned_projects = PinnedProjects(project_slugs=new_project_slugs).model_dump()
2✔
465
            user_preferences.pinned_projects = pinned_projects
2✔
466
            return user_preferences.dump()
2✔
467

468
    @only_authenticated
2✔
469
    async def add_dismiss_project_migration_banner(self, requested_by: base_models.APIUser) -> UserPreferences:
2✔
470
        """Set the dismiss project migration banner to true."""
471
        async with self.session_maker() as session, session.begin():
2✔
472
            result = await session.scalars(
2✔
473
                select(UserPreferencesORM).where(UserPreferencesORM.user_id == cast(str, requested_by.id))
474
            )
475
            user_preferences_orm = result.one_or_none()
2✔
476
            if user_preferences_orm is None:
2✔
477
                user_preferences_orm = UserPreferencesORM(
2✔
478
                    user_id=cast(str, requested_by.id),
479
                    pinned_projects={"project_slugs": []},
480
                    show_project_migration_banner=False,
481
                )
482
                session.add(user_preferences_orm)
2✔
483
            else:
484
                user_preferences_orm.show_project_migration_banner = False
×
485

486
            await session.flush()
2✔
487
            await session.refresh(user_preferences_orm)
2✔
488
            return user_preferences_orm.dump()
2✔
489

490
    @only_authenticated
2✔
491
    async def remove_dismiss_project_migration_banner(self, requested_by: APIUser) -> UserPreferences:
2✔
492
        """Removes dismiss project migration banner from the user's preferences."""
493
        async with self.session_maker() as session, session.begin():
2✔
494
            res = await session.scalars(select(UserPreferencesORM).where(UserPreferencesORM.user_id == requested_by.id))
2✔
495
            user_preferences = res.one_or_none()
2✔
496

497
            if user_preferences is None:
2✔
498
                raise errors.MissingResourceError(message="Preferences not found for user.", quiet=True)
×
499

500
            user_preferences.show_project_migration_banner = True
2✔
501
            return user_preferences.dump()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc