• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 11288123511

11 Oct 2024 07:23AM UTC coverage: 90.667% (+0.2%) from 90.477%
11288123511

Pull #407

github

web-flow
Merge 20c6c8af6 into 5b095d795
Pull Request #407: feat!: add data connectors

1226 of 1325 new or added lines in 28 files covered. (92.53%)

3 existing lines in 3 files now uncovered.

10589 of 11679 relevant lines covered (90.67%)

1.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/components/renku_data_services/base_api/auth.py
1
"""Authentication decorators for Sanic."""
2✔
2

3
import re
2✔
4
from collections.abc import Awaitable, Callable, Coroutine
2✔
5
from functools import wraps
2✔
6
from typing import Any, Concatenate, ParamSpec, TypeVar, cast
2✔
7

8
from sanic import Request
2✔
9

10
from renku_data_services import errors
2✔
11
from renku_data_services.base_models import APIUser, Authenticator
2✔
12

13
_T = TypeVar("_T")
2✔
14
_P = ParamSpec("_P")
2✔
15

16

17
def authenticate(
2✔
18
    authenticator: Authenticator,
19
) -> Callable[
20
    [Callable[Concatenate[Request, APIUser, _P], Awaitable[_T]]],
21
    Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]],
22
]:
23
    """Decorator for a Sanic handler that adds the APIUser model to the context.
24

25
    The APIUser is present for admins, non-admins and users who are not logged in.
26
    """
27

28
    def decorator(
2✔
29
        f: Callable[Concatenate[Request, APIUser, _P], Awaitable[_T]],
30
    ) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]:
31
        @wraps(f)
2✔
32
        async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
33
            token = request.headers.get(authenticator.token_field)
2✔
34
            user = APIUser()
2✔
35
            if token is not None and len(token) >= 8:
2✔
36
                token = token.removeprefix("Bearer ").removeprefix("bearer ")
2✔
37
                user = await authenticator.authenticate(token, request)
2✔
38

39
            response = await f(request, user, *args, **kwargs)
2✔
40
            return response
2✔
41

42
        return decorated_function
2✔
43

44
    return decorator
2✔
45

46

47
def validate_path_project_id(
2✔
48
    f: Callable[Concatenate[Request, _P], Awaitable[_T]],
49
) -> Callable[Concatenate[Request, _P], Awaitable[_T]]:
50
    """Decorator for a Sanic handler that validates the project_id path parameter."""
51
    _path_project_id_regex = re.compile(r"^[A-Za-z0-9]{26}$")
2✔
52

53
    @wraps(f)
2✔
54
    async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
55
        project_id = cast(str | None, kwargs.get("project_id"))
2✔
56
        if not project_id:
2✔
57
            raise errors.ProgrammingError(
×
58
                message="Could not find 'project_id' in the keyword arguments for the handler in order to validate it."
59
            )
60
        if not _path_project_id_regex.match(project_id):
2✔
61
            raise errors.ValidationError(
×
62
                message=f"The 'project_id' path parameter {project_id} does not match the required "
63
                f"regex {_path_project_id_regex}"
64
            )
65

66
        return await f(request, *args, **kwargs)
2✔
67

68
    return decorated_function
2✔
69

70

71
def validate_path_user_id(
2✔
72
    f: Callable[Concatenate[Request, _P], Awaitable[_T]],
73
) -> Callable[Concatenate[Request, _P], Awaitable[_T]]:
74
    """Decorator for a Sanic handler that validates the user_id or member_id path parameter."""
75
    _path_user_id_regex = re.compile(r"^[A-Za-z0-9]{1}[A-Za-z0-9-]+$")
2✔
76

77
    @wraps(f)
2✔
78
    async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
79
        user_id: str | None = cast(str | None, kwargs.get("user_id"))
2✔
80
        member_id: str | None = cast(str | None, kwargs.get("member_id"))
2✔
81
        if user_id and member_id:
2✔
82
            raise errors.ProgrammingError(
×
83
                message="Validating the user ID in a request path failed because matches for both"
84
                " 'user_id' and 'member_id' were found in the request handler parameters but only "
85
                "one match was expected."
86
            )
87
        user_id = user_id or member_id
2✔
88
        if not user_id:
2✔
89
            raise errors.ProgrammingError(
×
90
                message="Could not find 'user_id' or 'member_id' in the keyword arguments for the handler "
91
                "in order to validate it."
92
            )
93
        if not _path_user_id_regex.match(user_id):
2✔
UNCOV
94
            raise errors.ValidationError(
×
95
                message=f"The 'user_id' or 'member_id' path parameter {user_id} does not match the requried "
96
                f"regex {_path_user_id_regex}"
97
            )
98

99
        return await f(request, *args, **kwargs)
2✔
100

101
    return decorated_function
2✔
102

103

104
def only_admins(
2✔
105
    f: Callable[Concatenate[Request, APIUser, _P], Awaitable[_T]],
106
) -> Callable[Concatenate[Request, APIUser, _P], Awaitable[_T]]:
107
    """Decorator for a Sanic handler that errors out if the user is not an admin."""
108

109
    @wraps(f)
2✔
110
    async def decorated_function(request: Request, user: APIUser, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
111
        if user is None or user.access_token is None:
2✔
112
            raise errors.UnauthorizedError(
×
113
                message="Please provide valid access credentials in the Authorization header."
114
            )
115
        if not user.is_admin:
2✔
116
            raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
1✔
117

118
        # the user is authenticated and is an admin
119
        response = await f(request, user, *args, **kwargs)
2✔
120
        return response
2✔
121

122
    return decorated_function
2✔
123

124

125
def only_authenticated(f: Callable[_P, Awaitable[_T]]) -> Callable[_P, Awaitable[_T]]:
2✔
126
    """Decorator that errors out if the user is not authenticated.
127

128
    It looks for APIUser in the named or unnamed parameters.
129
    """
130

131
    @wraps(f)
2✔
132
    async def decorated_function(*args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
133
        api_user = None
2✔
134
        if "requested_by" in kwargs and isinstance(kwargs["requested_by"], APIUser):
2✔
135
            api_user = kwargs["requested_by"]
2✔
136
        elif "user" in kwargs and isinstance(kwargs["user"], APIUser):
2✔
137
            api_user = kwargs["user"]
×
138
        elif len(args) >= 1:
2✔
139
            api_user_search = [a for a in args if isinstance(a, APIUser)]
2✔
140
            if len(api_user_search) == 1:
2✔
141
                api_user = api_user_search[0]
2✔
142
            else:
143
                raise errors.ProgrammingError(
×
144
                    detail="Found no or more than one valid non-keyword APIUser arguments when "
145
                    "authenticating user, expected only one."
146
                )
147

148
        if api_user is None or not api_user.is_authenticated:
2✔
149
            raise errors.UnauthorizedError(message="You have to be authenticated to perform this operation.")
1✔
150

151
        # the user is authenticated
152
        response = await f(*args, **kwargs)
2✔
153
        return response
2✔
154

155
    return decorated_function
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc