• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ghga-de / test-oidc-provider / 14860331396

06 May 2025 01:01PM UTC coverage: 86.557% (-0.6%) from 87.129%
14860331396

Pull #14

github

Cito
Improve error messages
Pull Request #14: Improve error messages

0 of 2 new or added lines in 1 file covered. (0.0%)

2 existing lines in 1 file now uncovered.

264 of 305 relevant lines covered (86.56%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/src/top/api/main.py
1
# Copyright 2021 - 2025 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
2
# for the German Human Genome-Phenome Archive (GHGA)
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
"""Module containing the main FastAPI router and API endpoints."""
17

18
import logging
1✔
19
from enum import Enum
1✔
20
from typing import Annotated
1✔
21

22
from fastapi import (
1✔
23
    FastAPI,
24
    Form,
25
    HTTPException,
26
    Request,
27
    Response,
28
    Security,
29
    status,
30
)
31
from fastapi.responses import RedirectResponse
1✔
32
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
1✔
33
from ghga_service_commons.api import configure_app
1✔
34
from pydantic import AnyHttpUrl
1✔
35

36
from ..config import CONFIG
1✔
37
from ..core.http_utils import get_original_url
1✔
38
from ..core.models import LoginInfo, OidcConfiguration, TokenResponse, UserInfo
1✔
39
from ..core.oidc_provider import Jwks, OidcProvider
1✔
40

41
log = logging.getLogger(__name__)
1✔
42

43
app = FastAPI()
1✔
44
configure_app(app, config=CONFIG)
1✔
45

46
oidc_provider = OidcProvider(CONFIG)
1✔
47

48
tags: list[str | Enum] = ["TestOP"]
1✔
49

50

51
@app.get(
1✔
52
    "/health",
53
    summary="health",
54
    tags=tags,
55
    status_code=status.HTTP_200_OK,
56
)
57
async def health():
1✔
58
    """Used to test if this service is alive"""
59
    return {"status": "OK"}
1✔
60

61

62
@app.get(
1✔
63
    "/.well-known/openid-configuration",
64
    summary="Get the OpenID connect configuration",
65
    tags=tags,
66
    status_code=status.HTTP_200_OK,
67
)
68
async def get_openid_configuration(request: Request) -> OidcConfiguration:
1✔
69
    """The OpenID discovery endpoint."""
70
    original_url = get_original_url(request)
1✔
71
    # remove the current route to get the base URL
72
    base_url = original_url.removesuffix(".well-known/openid-configuration")
1✔
73
    # construct the other URLs from the base URL
74
    authorization_endpoint = AnyHttpUrl(base_url + "authorize")
1✔
75
    token_endpoint = AnyHttpUrl(base_url + "token")
1✔
76
    userinfo_endpoint = AnyHttpUrl(base_url + "userinfo")
1✔
77
    jwks_uri = AnyHttpUrl(base_url + "jwks")
1✔
78
    return OidcConfiguration(
1✔
79
        authorization_endpoint=authorization_endpoint,
80
        token_endpoint=token_endpoint,
81
        userinfo_endpoint=userinfo_endpoint,
82
        issuer=CONFIG.issuer,
83
        jwks_uri=jwks_uri,
84
    )
85

86

87
@app.get(
1✔
88
    "/jwks",
89
    summary="Get the JSON Web Key Set of the OP",
90
    tags=tags,
91
    status_code=status.HTTP_200_OK,
92
)
93
async def get_jwks() -> Jwks:
1✔
94
    """Get the JSON Web Key Set of the test OP."""
95
    return oidc_provider.jwks
1✔
96

97

98
@app.post(
1✔
99
    "/login",
100
    summary="Log in as a test user",
101
    tags=tags,
102
    status_code=status.HTTP_201_CREATED,
103
    responses={
104
        status.HTTP_201_CREATED: {
105
            "model": str,
106
            "description": "Access token has been created.",
107
        },
108
        status.HTTP_422_UNPROCESSABLE_ENTITY: {
109
            "description": "Validation error in submitted data."
110
        },
111
    },
112
)
113
async def login(login_info: LoginInfo) -> Response:
1✔
114
    """Endpoint for logging in to the OP as a test user."""
115
    log.debug("Logging in with info: %s", login_info)
1✔
116
    try:
1✔
117
        token = oidc_provider.login(login_info)
1✔
118
    except (TypeError, ValueError) as error:
×
119
        log.info("Invalid login info: %s", error)
×
120
        raise HTTPException(
×
121
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
122
            detail=str(error),
123
        ) from error
124
    log.debug("Created login token: %s", token)
1✔
125
    return Response(
1✔
126
        content=token, media_type="application/jwt", status_code=status.HTTP_201_CREATED
127
    )
128

129

130
@app.get(
1✔
131
    "/authorize",
132
    summary="OIDC Authorization Endpoint",
133
    tags=tags,
134
    status_code=status.HTTP_302_FOUND,
135
    responses={
136
        status.HTTP_302_FOUND: {"description": "Redirected back to client."},
137
        status.HTTP_422_UNPROCESSABLE_ENTITY: {
138
            "description": "Validation error in submitted data."
139
        },
140
    },
141
)
142
async def authorize(
1✔
143
    response_type: str,
144
    client_id: str,
145
    redirect_uri: str,
146
    scope: str,
147
    state: str,
148
) -> RedirectResponse:
149
    """The authorization endpoint of the test OP.
150

151
    This authorizes the last logged in test user without checking credentials.
152
    """
153
    try:
1✔
154
        url = oidc_provider.authorize(
1✔
155
            response_type=response_type,
156
            client_id=client_id,
157
            redirect_uri=redirect_uri,
158
            scope=scope,
159
            state=state,
160
        )
161
    except ValueError as error:
×
NEW
162
        log.error("Error in authorization: %s", error)
×
UNCOV
163
        raise HTTPException(
×
164
            status_code=status.HTTP_400_BAD_REQUEST,
165
            detail=str(error),
166
        ) from error
167
    # Redirect back with code and state or error message
168
    return RedirectResponse(url=url, status_code=status.HTTP_302_FOUND)
1✔
169

170

171
@app.post(
1✔
172
    "/token",
173
    summary="OIDC Token Endpoint",
174
    tags=tags,
175
    status_code=status.HTTP_200_OK,
176
    responses={
177
        status.HTTP_200_OK: {
178
            "model": TokenResponse,
179
            "description": "Access token has been granted.",
180
        },
181
        status.HTTP_400_BAD_REQUEST: {"description": "Error in submitted data."},
182
        status.HTTP_422_UNPROCESSABLE_ENTITY: {
183
            "description": "Validation error in submitted data."
184
        },
185
    },
186
)
187
async def token(
1✔
188
    grant_type: str = Form(""),
189
    code: str = Form(""),
190
    redirect_uri: str = Form(""),
191
    client_id: str = Form(""),
192
) -> TokenResponse:
193
    """The token endpoint of the test OP."""
194
    try:
1✔
195
        return oidc_provider.token(
1✔
196
            grant_type=grant_type,
197
            code=code,
198
            redirect_uri=redirect_uri,
199
            client_id=client_id,
200
        )
201
    except ValueError as error:
×
NEW
202
        log.warning("Error when getting token: %s", error)
×
UNCOV
203
        raise HTTPException(
×
204
            status_code=status.HTTP_400_BAD_REQUEST,
205
            detail=str(error),
206
        ) from error
207

208

209
@app.get(
1✔
210
    "/userinfo",
211
    summary="Get user information",
212
    tags=tags,
213
    status_code=status.HTTP_200_OK,
214
    responses={
215
        status.HTTP_200_OK: {
216
            "model": UserInfo,
217
            "description": "User info has been fetched.",
218
        },
219
        status.HTTP_403_FORBIDDEN: {"description": "Not authorized to get user info."},
220
        status.HTTP_422_UNPROCESSABLE_ENTITY: {
221
            "description": "Validation error in submitted data."
222
        },
223
    },
224
)
225
async def get_userinfo(
1✔
226
    credentials: Annotated[HTTPAuthorizationCredentials, Security(HTTPBearer())],
227
) -> UserInfo:
228
    """The UserInfo endpoint of the test OP."""
229
    token = credentials.credentials
1✔
230
    log.debug("Getting user info for token: %s", token)
1✔
231
    try:
1✔
232
        return oidc_provider.user_info(token)
1✔
233
    except KeyError as error:
1✔
234
        log.info("User not found in cache.")
1✔
235
        raise HTTPException(
1✔
236
            status_code=status.HTTP_403_FORBIDDEN,
237
            detail=str(error),
238
        ) from error
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc