• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / trailblazer / 12596434604

03 Jan 2025 10:37AM UTC coverage: 87.616%. First build
12596434604

Pull #507

github

ChrOertlin
fix: fix error
Pull Request #507: feat( user token verification)

25 of 52 new or added lines in 4 files covered. (48.08%)

2080 of 2374 relevant lines covered (87.62%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.84
/trailblazer/services/user_verification_service/user_verification_service.py
1
from typing import Mapping
1✔
2

3
from google.auth import jwt
1✔
4
import requests
1✔
5

6
from trailblazer.services.user_verification_service.exc import (
1✔
7
    GoogleCertsError,
8
    UserTokenVerificationError,
9
)
10
from trailblazer.store.models import User
1✔
11
from trailblazer.store.store import Store
1✔
12

13

14
class UserVerificationService:
1✔
15
    """Service to verify the user."""
16

17
    def __init__(self, store: Store, google_client_id: str):
1✔
NEW
18
        self.store: Store = store
×
NEW
19
        self.google_client_id: str = google_client_id
×
20

21
    def verify_user(self, authorization_header: str) -> User:
1✔
22
        """Verify the user by checking if the JWT token provided is valid."""
NEW
23
        jwt_token: str = self._extract_token_from_header(authorization_header)
×
NEW
24
        google_certs: Mapping = self._get_google_certs()
×
NEW
25
        try:
×
NEW
26
            payload: Mapping = jwt.decode(
×
27
                token=jwt_token, certs=google_certs, verify=True, audience=self.google_client_id
28
            )
NEW
29
        except Exception as error:
×
NEW
30
            raise UserTokenVerificationError(
×
31
                "Could not verify user token. It might be false or expired."
32
            ) from error
NEW
33
        user_email: str = payload["email"]
×
NEW
34
        return self._get_user(user_email)
×
35

36
    @staticmethod
1✔
37
    def _extract_token_from_header(authorization_header: str) -> str:
1✔
38
        """Extract the token from the authorization header.
39
        args: authorization_header: The authorization header.
40
        raises ValueError: If no authorization header is provided.
41
        """
NEW
42
        if authorization_header:
×
NEW
43
            jwt_token = authorization_header.split("Bearer ")[-1]
×
NEW
44
            return jwt_token
×
NEW
45
        raise ValueError("No authorization header provided with request")
×
46

47
    @staticmethod
1✔
48
    def _get_google_certs() -> Mapping:
1✔
49
        """Get the Google certificates."""
NEW
50
        try:
×
51
            # Fetch the Google public keys. Google oauth uses v1 certs.
NEW
52
            response = requests.get("https://www.googleapis.com/oauth2/v1/certs")
×
NEW
53
            response.raise_for_status()
×
NEW
54
            return response.json()
×
NEW
55
        except requests.RequestException as e:
×
NEW
56
            raise GoogleCertsError("Failed to fetch Google public keys") from e
×
57

58
    def _get_user(self, user_email: str) -> User:
1✔
59
        """Check if the user is known."""
NEW
60
        if user := self.store.get_user(email=user_email, exclude_archived=True):
×
NEW
61
            return user
×
NEW
62
        raise ValueError("User not found in the database")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc