• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 12256793455

10 Dec 2024 01:09PM UTC coverage: 85.742% (+0.001%) from 85.741%
12256793455

Pull #541

github

web-flow
Merge ad17872c1 into 4c61b920b
Pull Request #541: feat: add kpack k8s resources

14 of 16 new or added lines in 3 files covered. (87.5%)

6 existing lines in 3 files now uncovered.

14529 of 16945 relevant lines covered (85.74%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.13
/components/renku_data_services/base_api/auth.py
1
"""Authentication decorators for Sanic."""
2

3
import asyncio
2✔
4
import re
2✔
5
from collections.abc import Callable, Coroutine
2✔
6
from functools import wraps
2✔
7
from typing import Any, Concatenate, ParamSpec, TypeVar, cast
2✔
8

9
from sanic import Request
2✔
10

11
from renku_data_services import errors
2✔
12
from renku_data_services.base_models import AnyAPIUser, APIUser, Authenticator
2✔
13

14
_T = TypeVar("_T")
2✔
15
_P = ParamSpec("_P")
2✔
16

17

18
def authenticate(
2✔
19
    authenticator: Authenticator,
20
) -> Callable[
21
    [Callable[Concatenate[Request, AnyAPIUser, _P], Coroutine[Any, Any, _T]]],
22
    Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]],
23
]:
24
    """Decorator for a Sanic handler that adds the APIUser model to the context.
25

26
    The APIUser is present for admins, non-admins and users who are not logged in.
27
    """
28

29
    def decorator(
2✔
30
        f: Callable[Concatenate[Request, AnyAPIUser, _P], Coroutine[Any, Any, _T]],
31
    ) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]:
32
        @wraps(f)
2✔
33
        async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
34
            token = request.headers.get(authenticator.token_field)
2✔
35
            user = await authenticator.authenticate(token or "", request)
2✔
36
            response = await f(request, user, *args, **kwargs)
2✔
37
            return response
2✔
38

39
        return decorated_function
2✔
40

41
    return decorator
2✔
42

43

44
def authenticate_2(
2✔
45
    authenticator1: Authenticator,
46
    authenticator2: Authenticator,
47
) -> Callable[
48
    [Callable[Concatenate[Request, AnyAPIUser, AnyAPIUser, _P], Coroutine[Any, Any, _T]]],
49
    Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]],
50
]:
51
    """Decorator for a Sanic handler that adds the APIUser when another authentication has already been done."""
52

53
    def decorator(
2✔
54
        f: Callable[Concatenate[Request, AnyAPIUser, AnyAPIUser, _P], Coroutine[Any, Any, _T]],
55
    ) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]:
56
        @wraps(f)
2✔
57
        async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
58
            token1 = request.headers.get(authenticator1.token_field)
1✔
59
            token2 = request.headers.get(authenticator2.token_field)
1✔
60
            user1: AnyAPIUser
61
            user2: AnyAPIUser
62
            [user1, user2] = await asyncio.gather(
1✔
63
                authenticator1.authenticate(token1 or "", request),
64
                authenticator2.authenticate(token2 or "", request),
65
            )
66
            response = await f(request, user1, user2, *args, **kwargs)
1✔
67
            return response
1✔
68

69
        return decorated_function
2✔
70

71
    return decorator
2✔
72

73

74
def validate_path_user_id(
2✔
75
    f: Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]],
76
) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]:
77
    """Decorator for a Sanic handler that validates the user_id or member_id path parameter."""
78
    _path_user_id_regex = re.compile(r"^[A-Za-z0-9]{1}[A-Za-z0-9-]+$")
2✔
79

80
    @wraps(f)
2✔
81
    async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
82
        user_id: str | None = cast(str | None, kwargs.get("user_id"))
2✔
83
        member_id: str | None = cast(str | None, kwargs.get("member_id"))
2✔
84
        if user_id and member_id:
2✔
85
            raise errors.ProgrammingError(
×
86
                message="Validating the user ID in a request path failed because matches for both"
87
                " 'user_id' and 'member_id' were found in the request handler parameters but only "
88
                "one match was expected."
89
            )
90
        user_id = user_id or member_id
2✔
91
        if not user_id:
2✔
92
            raise errors.ProgrammingError(
×
93
                message="Could not find 'user_id' or 'member_id' in the keyword arguments for the handler "
94
                "in order to validate it."
95
            )
96
        if not _path_user_id_regex.match(user_id):
2✔
UNCOV
97
            raise errors.ValidationError(
×
98
                message=f"The 'user_id' or 'member_id' path parameter {user_id} does not match the requried "
99
                f"regex {_path_user_id_regex}"
100
            )
101

102
        return await f(request, *args, **kwargs)
2✔
103

104
    return decorated_function
2✔
105

106

107
def only_admins(
2✔
108
    f: Callable[Concatenate[Request, APIUser, _P], Coroutine[Any, Any, _T]],
109
) -> Callable[Concatenate[Request, APIUser, _P], Coroutine[Any, Any, _T]]:
110
    """Decorator for a Sanic handler that errors out if the user is not an admin."""
111

112
    @wraps(f)
2✔
113
    async def decorated_function(request: Request, user: APIUser, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
114
        if user is None or user.access_token is None:
2✔
115
            raise errors.UnauthorizedError(
×
116
                message="Please provide valid access credentials in the Authorization header."
117
            )
118
        if not user.is_admin:
2✔
119
            raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
1✔
120

121
        # the user is authenticated and is an admin
122
        response = await f(request, user, *args, **kwargs)
2✔
123
        return response
2✔
124

125
    return decorated_function
2✔
126

127

128
def only_authenticated(f: Callable[_P, Coroutine[Any, Any, _T]]) -> Callable[_P, Coroutine[Any, Any, _T]]:
2✔
129
    """Decorator that errors out if the user is not authenticated.
130

131
    It looks for APIUser in the named or unnamed parameters.
132
    """
133

134
    @wraps(f)
2✔
135
    async def decorated_function(*args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
136
        api_user = None
2✔
137
        if "requested_by" in kwargs and isinstance(kwargs["requested_by"], APIUser):
2✔
138
            api_user = kwargs["requested_by"]
2✔
139
        elif "user" in kwargs and isinstance(kwargs["user"], APIUser):
2✔
140
            api_user = kwargs["user"]
×
141
        elif len(args) >= 1:
2✔
142
            api_user_search = [a for a in args if isinstance(a, APIUser)]
2✔
143
            if len(api_user_search) == 1:
2✔
144
                api_user = api_user_search[0]
2✔
145
            else:
146
                raise errors.ProgrammingError(
×
147
                    detail="Found no or more than one valid non-keyword APIUser arguments when "
148
                    "authenticating user, expected only one."
149
                )
150

151
        if api_user is None or not api_user.is_authenticated:
2✔
152
            raise errors.UnauthorizedError(message="You have to be authenticated to perform this operation.")
1✔
153

154
        # the user is authenticated
155
        response = await f(*args, **kwargs)
2✔
156
        return response
2✔
157

158
    return decorated_function
2✔
159

160

161
def internal_gitlab_authenticate(
2✔
162
    authenticator: Authenticator,
163
) -> Callable[
164
    [Callable[Concatenate[Request, APIUser, APIUser, _P], Coroutine[Any, Any, _T]]],
165
    Callable[Concatenate[Request, APIUser, _P], Coroutine[Any, Any, _T]],
166
]:
167
    """Decorator for a Sanic handler that that adds a user for the internal gitlab user."""
168

169
    def decorator(
×
170
        f: Callable[Concatenate[Request, APIUser, APIUser, _P], Coroutine[Any, Any, _T]],
171
    ) -> Callable[Concatenate[Request, APIUser, _P], Coroutine[Any, Any, _T]]:
172
        @wraps(f)
×
173
        async def decorated_function(
×
174
            request: Request,
175
            user: APIUser,
176
            *args: _P.args,
177
            **kwargs: _P.kwargs,
178
        ) -> _T:
179
            access_token = str(request.headers.get("Gitlab-Access-Token"))
×
180
            internal_gitlab_user = await authenticator.authenticate(access_token, request)
×
181
            return await f(request, user, internal_gitlab_user, *args, **kwargs)
×
182

183
        return decorated_function
×
184

185
    return decorator
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc