• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

observatorycontrolsystem / ocs-authentication / 10325886202

09 Aug 2024 09:21PM UTC coverage: 86.348% (+5.6%) from 80.714%
10325886202

Pull #4

github

Jon
Removed auth classes from server called endpoint since its all handled by the permission
Pull Request #4: Ignore throttling from clients

3 of 4 new or added lines in 1 file covered. (75.0%)

1 existing line in 1 file now uncovered.

506 of 586 relevant lines covered (86.35%)

2.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/ocs_authentication/util.py
1
from dataclasses import dataclass
3✔
2

3
import requests
3✔
4
from django.db import transaction
3✔
5
from django.contrib.auth import get_user_model
3✔
6
from rest_framework.throttling import BaseThrottle
3✔
7

8
from ocs_authentication.settings import ocs_auth_settings
3✔
9
from ocs_authentication.auth_profile.models import AuthProfile
3✔
10
from ocs_authentication.exceptions import ProfileException, OAuthTokenException
3✔
11

12

13
class NoThrottle(BaseThrottle):
3✔
14
    def allow_request(self, request, view):
3✔
NEW
15
        return True
×
16

17

18
@dataclass
3✔
19
class Profile:
3✔
20
    """Dataclass encapsulating profile information"""
21
    first_name: str
22
    last_name: str
23
    username: str
24
    email: str
25
    api_token: str
26
    is_staff: bool
27
    is_superuser: bool
28
    staff_view: bool
29

30

31
def get_profile(access_token: str) -> Profile:
32
    headers = {'Authorization': f'Bearer {access_token}'}
33
    profile_response = requests.get(
34
        ocs_auth_settings.OAUTH_PROFILE_URL,
35
        headers=headers,
36
        timeout=ocs_auth_settings.REQUESTS_TIMEOUT_SECONDS
37
    )
38
    if profile_response.status_code == 200:
39
        return Profile(
40
            profile_response.json()['first_name'],
41
            profile_response.json()['last_name'],
42
            profile_response.json()['username'],
43
            profile_response.json()['email'],
44
            profile_response.json()['tokens']['api_token'],
45
            profile_response.json()['is_staff'],
46
            profile_response.json()['is_superuser'],
47
            profile_response.json()['profile']['staff_view'],
48
        )
49
    else:
50
        raise ProfileException('Unable to access profile information')
51

52

53
def generate_tokens(username: str, password: str):
54
    token_response = requests.post(
55
        ocs_auth_settings.OAUTH_TOKEN_URL,
56
        data={
57
            'grant_type': 'password',
58
            'username': username,
59
            'password': password,
60
            'client_id': ocs_auth_settings.OAUTH_CLIENT_ID,
61
            'client_secret': ocs_auth_settings.OAUTH_CLIENT_SECRET
62
        },
63
        timeout=ocs_auth_settings.REQUESTS_TIMEOUT_SECONDS
64
    )
65
    if token_response.status_code == 200:
66
        return token_response.json()['access_token'], token_response.json()['refresh_token']
67
    else:
68
        raise OAuthTokenException('Failed to generate OAuth tokens')
69

70

71
def create_or_update_user(profile: Profile, password: str):
72
    with transaction.atomic():
73
        user, _ = get_user_model().objects.update_or_create(
74
            username=profile.username,
75
            defaults={
76
                'first_name': profile.first_name,
77
                'last_name': profile.last_name,
78
                'email': profile.email,
79
                'is_staff': profile.is_staff,
80
                'is_superuser': profile.is_superuser,
81
            }
82
        )
83
        if password:
84
            user.set_password(password)
85
        user.save()
86
        AuthProfile.objects.update_or_create(
87
            user=user,
88
            defaults={
89
                'staff_view': profile.staff_view,
90
                'api_token': profile.api_token
91
            }
92
        )
93
        # TODO:: This could will update DRFs internal authtoken as well. Uncomment this when we are
94
        #        ready to transition to a single api_token.
95
        # token, _ = Token.objects.get_or_create(user=user)
96
        # if profile.api_token != token.key:
97
        #     # Need to set the api token to some expected value
98
        #     token.delete()
99
        #     Token.objects.create(user=user, key=profile.api_token)
100
        return user
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc