• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 10353957042

12 Aug 2024 02:35PM UTC coverage: 90.758% (+0.4%) from 90.398%
10353957042

Pull #338

github

web-flow
Merge 3a49eb6c2 into 8afb94949
Pull Request #338: feat!: expand environments specification

227 of 237 new or added lines in 7 files covered. (95.78%)

48 existing lines in 9 files now uncovered.

9202 of 10139 relevant lines covered (90.76%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.07
/components/renku_data_services/secrets/db.py
1
"""Database repo for secrets."""
2✔
2

3
from collections.abc import AsyncGenerator, Callable, Sequence
2✔
4
from datetime import UTC, datetime
2✔
5
from typing import cast
2✔
6

7
from sqlalchemy import delete, select
2✔
8
from sqlalchemy.exc import IntegrityError
2✔
9
from sqlalchemy.ext.asyncio import AsyncSession
2✔
10
from ulid import ULID
2✔
11

12
from renku_data_services.base_api.auth import APIUser, only_authenticated
2✔
13
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
2✔
14
from renku_data_services.errors import errors
2✔
15
from renku_data_services.secrets.models import Secret, SecretKind
2✔
16
from renku_data_services.secrets.orm import SecretORM
2✔
17

18

19
class UserSecretsRepo:
2✔
20
    """An adapter for accessing users secrets."""
2✔
21

22
    def __init__(
2✔
23
        self,
24
        session_maker: Callable[..., AsyncSession],
25
    ) -> None:
26
        self.session_maker = session_maker
2✔
27

28
    @only_authenticated
2✔
29
    async def get_user_secrets(self, requested_by: APIUser, kind: SecretKind) -> list[Secret]:
2✔
30
        """Get all user's secrets from the database."""
31
        async with self.session_maker() as session:
2✔
32
            stmt = select(SecretORM).where(SecretORM.user_id == requested_by.id).where(SecretORM.kind == kind)
2✔
33
            res = await session.execute(stmt)
2✔
34
            orm = res.scalars().all()
2✔
35
            return [o.dump() for o in orm]
2✔
36

37
    @only_authenticated
2✔
38
    async def get_secret_by_id(self, requested_by: APIUser, secret_id: ULID) -> Secret | None:
2✔
39
        """Get a specific user secret from the database."""
40
        async with self.session_maker() as session:
2✔
41
            stmt = select(SecretORM).where(SecretORM.user_id == requested_by.id).where(SecretORM.id == str(secret_id))
2✔
42
            res = await session.execute(stmt)
2✔
43
            orm = res.scalar_one_or_none()
2✔
44
            if orm is None:
2✔
45
                return None
2✔
46
            return orm.dump()
1✔
47

48
    @only_authenticated
2✔
49
    async def get_secrets_by_ids(self, requested_by: APIUser, secret_ids: list[ULID]) -> list[Secret]:
2✔
50
        """Get a specific user secrets from the database."""
51
        secret_ids_str = map(str, secret_ids)
1✔
52
        async with self.session_maker() as session:
1✔
53
            stmt = select(SecretORM).where(SecretORM.user_id == requested_by.id).where(SecretORM.id.in_(secret_ids_str))
1✔
54
            res = await session.execute(stmt)
1✔
55
            orms = res.scalars()
1✔
56
            return [orm.dump() for orm in orms]
1✔
57

58
    @only_authenticated
2✔
59
    async def insert_secret(self, requested_by: APIUser, secret: Secret) -> Secret:
2✔
60
        """Insert a new secret."""
61

62
        async with self.session_maker() as session, session.begin():
2✔
63
            modification_date = datetime.now(UTC).replace(microsecond=0)
2✔
64
            orm = SecretORM(
2✔
65
                name=secret.name,
66
                modification_date=modification_date,
67
                user_id=requested_by.id,
68
                encrypted_value=secret.encrypted_value,
69
                encrypted_key=secret.encrypted_key,
70
                kind=secret.kind,
71
            )
72
            session.add(orm)
2✔
73

74
            try:
2✔
75
                await session.flush()
2✔
76
            except IntegrityError as err:
×
UNCOV
77
                if len(err.args) > 0 and "UniqueViolationError" in err.args[0]:
×
UNCOV
78
                    raise errors.ValidationError(
×
79
                        message="The name for the secret should be unique but it already exists",
80
                        detail="Please modify the name field and then retry",
81
                    )
82
                else:
UNCOV
83
                    raise
×
84
            return orm.dump()
2✔
85

86
    @only_authenticated
2✔
87
    async def update_secret(
2✔
88
        self, requested_by: APIUser, secret_id: ULID, encrypted_value: bytes, encrypted_key: bytes
89
    ) -> Secret:
90
        """Update a secret."""
91

92
        async with self.session_maker() as session, session.begin():
2✔
93
            result = await session.execute(
2✔
94
                select(SecretORM).where(SecretORM.id == str(secret_id)).where(SecretORM.user_id == requested_by.id)
95
            )
96
            secret = result.scalar_one_or_none()
2✔
97
            if secret is None:
2✔
98
                raise errors.MissingResourceError(message=f"The secret with id '{secret_id}' cannot be found")
1✔
99

100
            secret.encrypted_value = encrypted_value
1✔
101
            secret.encrypted_key = encrypted_key
1✔
102
            secret.modification_date = datetime.now(UTC).replace(microsecond=0)
1✔
103
        return secret.dump()
1✔
104

105
    @only_authenticated
2✔
106
    async def delete_secret(self, requested_by: APIUser, secret_id: ULID) -> None:
2✔
107
        """Delete a secret."""
108

109
        async with self.session_maker() as session, session.begin():
2✔
110
            result = await session.execute(
2✔
111
                select(SecretORM).where(SecretORM.id == str(secret_id)).where(SecretORM.user_id == requested_by.id)
112
            )
113
            secret = result.scalar_one_or_none()
2✔
114
            if secret is None:
2✔
115
                return None
1✔
116

117
            await session.execute(delete(SecretORM).where(SecretORM.id == secret.id))
1✔
118

119
    async def get_all_secrets_batched(
2✔
120
        self, requested_by: InternalServiceAdmin, batch_size: int = 100
121
    ) -> AsyncGenerator[Sequence[tuple[Secret, str]], None]:
122
        """Get secrets in batches.
123

124
        Only for internal use.
125
        """
126
        if requested_by.id != ServiceAdminId.secrets_rotation:
1✔
UNCOV
127
            raise errors.ProgrammingError(message="Only secrets_rotation admin is allowed to call this method.")
×
128
        offset = 0
1✔
129
        while True:
1✔
130
            async with self.session_maker() as session, session.begin():
1✔
131
                result = await session.execute(
1✔
132
                    select(SecretORM).limit(batch_size).offset(offset).order_by(SecretORM.id)
133
                )
134
                secrets = [(s.dump(), cast(str, s.user_id)) for s in result.scalars()]
1✔
135
                if len(secrets) == 0:
1✔
136
                    break
1✔
137

138
                yield secrets
1✔
139

140
                offset += batch_size
1✔
141

142
    async def update_secrets(self, requested_by: InternalServiceAdmin, secrets: list[Secret]) -> None:
2✔
143
        """Update multiple secrets.
144

145
        Only for internal use.
146
        """
147
        if requested_by.id != ServiceAdminId.secrets_rotation:
1✔
UNCOV
148
            raise errors.ProgrammingError(message="Only secrets_rotation admin is allowed to call this method.")
×
149
        secret_dict = {s.id: s for s in secrets}
1✔
150

151
        async with self.session_maker() as session, session.begin():
1✔
152
            result = await session.scalars(select(SecretORM).where(SecretORM.id.in_(secret_dict.keys())))
1✔
153
            found_secrets = list(result)
1✔
154

155
            found_secret_ids = {s.id for s in found_secrets}
1✔
156
            if len(secret_dict) != len(found_secret_ids):
1✔
UNCOV
157
                raise errors.MissingResourceError(
×
158
                    message=f"Couldn't find secrets with ids: '{secret_dict.keys() - found_secret_ids}'"
159
                )
160

161
            for secret in found_secrets:
1✔
162
                new_secret = secret_dict[secret.id]
1✔
163

164
                secret.encrypted_value = new_secret.encrypted_value
1✔
165
                secret.encrypted_key = new_secret.encrypted_key
1✔
166
                secret.modification_date = datetime.now(UTC).replace(microsecond=0)
1✔
167

168
            await session.flush()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc