• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smartfile / django-session-jwt / 3850292061

pending completion
3850292061

Pull #45

github

GitHub
Merge 429568191 into 1a507ba87
Pull Request #45: Bump django from 3.2.8 to 3.2.16

278 of 291 relevant lines covered (95.53%)

26.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/django_session_jwt/middleware/session.py
1
import logging
28✔
2
import time
28✔
3

4
from datetime import datetime, timedelta
28✔
5

6
from os.path import exists as pathexists
28✔
7

8
import jwt
28✔
9
from jwt.exceptions import DecodeError, ExpiredSignatureError
28✔
10

11
from importlib import import_module
28✔
12

13
from django.conf import settings
28✔
14
from django.contrib.auth import get_user_model
28✔
15
from django.contrib.sessions.middleware import SessionMiddleware as BaseSessionMiddleware
28✔
16
from django.core.exceptions import ImproperlyConfigured
28✔
17

18

19
def _parse_key(key):
28✔
20
    def _load_key(k):
28✔
21
        if pathexists(k):
28✔
22
            k = open(k, 'rb').read()
28✔
23
        return k
28✔
24

25
    if isinstance(key, tuple):
28✔
26
        # Key pair.
27
        return _load_key(key[0]), _load_key(key[1]), 'RS256'
28✔
28

29
    key = _load_key(key)
28✔
30
    return key, key, 'HS256'
28✔
31

32

33
def _parse_fields(fields):
28✔
34
    "Parse and validate field definitions."
35
    snames, lnames = [], []
28✔
36

37
    for i, field in enumerate(fields):
28✔
38
        # Transform field in 3-tuple.
39
        if isinstance(field, tuple):
28✔
40
            if len(field) == 2:  # (attrname, sname)
28✔
41
                field = (field[0], field[1], field[1])
×
42

43
            elif len(field) == 1:  # (attrname)
28✔
44
                field = (field[0], field[0], field[0])
×
45

46
        else:  # attrname
47
            field = (field, field, field)
×
48

49
        # Collect all snames and lnames for uniqueness check.
50
        snames.append(field[1])
28✔
51
        lnames.append(field[2])
28✔
52

53
        # Validate that "sk" is not used, we use that for the session key.
54
        if field[1] == SESSION_FIELD:
28✔
55
            raise ImproperlyConfigured(
×
56
                'Short name "%s" is reserved for session field. Use '
57
                'DJANGO_SESSION_JWT["SESSION_FIELD"] to specify another '
58
                'value.' % SESSION_FIELD)
59

60
        if len(field) != 3:
28✔
61
            raise ImproperlyConfigured(
×
62
                'DJANGO_SESSION_JWT["FIELDS"] should be a list of 3-tuples.')
63

64
        fields[i] = field
28✔
65

66
    if len(snames) != len(set(snames)):
28✔
67
        raise ImproperlyConfigured(
×
68
            'DJANGO_SESSION_JWT["FIELDS"] short names are not unique')
69

70
    if len(lnames) != len(set(lnames)):
28✔
71
        raise ImproperlyConfigured(
×
72
            'DJANGO_SESSION_JWT["FIELDS"] long names are not unique')
73

74
    return fields
28✔
75

76

77
def _parse_callable(f):
28✔
78
    if not f:
28✔
79
        return
×
80

81
    module_name, _, f_name = f.rpartition('.')
28✔
82
    m = import_module(module_name)
28✔
83

84
    return getattr(m, f_name)
28✔
85

86

87
DJANGO_SESSION_JWT = getattr(settings, 'DJANGO_SESSION_JWT', {})
28✔
88
SESSION_FIELD = DJANGO_SESSION_JWT.get('SESSION_FIELD', 'sk')
28✔
89
KEY, PUBKEY, ALGO = _parse_key(DJANGO_SESSION_JWT.get('KEY', settings.SECRET_KEY))
28✔
90
FIELDS = _parse_fields(DJANGO_SESSION_JWT.get('FIELDS', []))
28✔
91
CALLABLE = _parse_callable(DJANGO_SESSION_JWT.get('CALLABLE'))
28✔
92
EXPIRES = DJANGO_SESSION_JWT.get('EXPIRES', None)
28✔
93
LOGGER = logging.getLogger(__name__)
28✔
94
LOGGER.addHandler(logging.NullHandler())
28✔
95

96

97
def rgetattr(obj, name):
28✔
98
    "Recursive getattr()."
99
    names = name.split('.')
28✔
100
    for n in names:
28✔
101
        obj = getattr(obj, n)
28✔
102
    return obj
28✔
103

104

105
def verify_jwt(blob):
28✔
106
    """
107
    Verify a JWT and return the session_key attribute from it.
108
    """
109
    try:
28✔
110
        fields = jwt.decode(blob, PUBKEY, algorithms=[ALGO])
28✔
111

112
    except (DecodeError, ExpiredSignatureError):
28✔
113
        return {}
28✔
114

115
    # Convert short names to long names.
116
    for _, sname, lname in FIELDS:
28✔
117
        try:
28✔
118
            # Leave both short and long forms in dictionary.
119
            fields[lname] = fields[sname]
28✔
120

121
        except KeyError:
28✔
122
            continue
28✔
123

124
    return fields
28✔
125

126

127
def create_jwt(user, session_key, expires=None):
28✔
128
    """
129
    Create a JWT for the given user containing the configured fields.
130
    """
131
    fields = {
28✔
132
        SESSION_FIELD: session_key,
133
        'iat': datetime.utcnow(),
134
    }
135
    if expires:
28✔
136
        # Set a future expiration date.
137
        fields['exp'] = datetime.utcnow() + timedelta(seconds=expires)
28✔
138

139
    for attrname, sname, _ in FIELDS:
28✔
140
        try:
28✔
141
            fields[sname] = rgetattr(user, attrname)
28✔
142

143
        except AttributeError:
28✔
144
            # Omit missing fields:
145
            LOGGER.warning('Could not get missing field %s from user', attrname)
28✔
146
            continue
28✔
147

148
    if CALLABLE:
28✔
149
        fields.update(CALLABLE(user))
28✔
150

151
    return jwt.encode(fields, KEY, algorithm=ALGO)
28✔
152

153

154
def convert_cookie(cookies, user):
28✔
155
    cookie = cookies[settings.SESSION_COOKIE_NAME]
28✔
156
    cookies[settings.SESSION_COOKIE_NAME] = create_jwt(
28✔
157
        user, cookie.value, EXPIRES)
158

159

160
class SessionMiddleware(BaseSessionMiddleware):
28✔
161
    """
162
    Extend django.contrib.sessions middleware to use JWT as session cookie.
163
    """
164

165
    def process_request(self, request):
28✔
166
        session_jwt = request.COOKIES.get(settings.SESSION_COOKIE_NAME)
28✔
167
        fields = verify_jwt(session_jwt)
28✔
168

169
        session_key = fields.pop(SESSION_FIELD, None)
28✔
170
        request.session = self.SessionStore(session_key)
28✔
171
        if fields:
28✔
172
            request.session['jwt'] = fields
28✔
173

174
    def process_response(self, request, response):
28✔
175
        if not request.user.is_authenticated:
28✔
176
            # The user is unauthenticated. Try to determine the user by the
177
            # session JWT
178
            User = get_user_model()
28✔
179
            try:
28✔
180
                user_id = request.session['jwt']['user_id']
28✔
181
                user = User.objects.get(id=user_id)
28✔
182
            except (KeyError, User.DoesNotExist):
28✔
183
                # Unable to determine the user. ID will not be set in the JWT.
184
                user = None
28✔
185
        else:
186
            user = request.user
28✔
187

188
        # Rather than duplicating the session logic here, just allow super()
189
        # to do it's thing, then convert the session cookie (if any) when it's
190
        # done.
191
        super(SessionMiddleware, self).process_response(request, response)
28✔
192

193
        # Determine if JWT is more than halfway through it's lifetime.
194
        expires = getattr(request, 'session', {}).get('jwt', {}).get('exp', None)
28✔
195
        halftime = time.mktime((datetime.utcnow() + timedelta(seconds=EXPIRES / 2)).timetuple())
28✔
196
        halflife = expires and expires <= halftime
28✔
197

198
        # Behave the same as contrib.sessions, only recreate the JWT if the session
199
        # was modified or SESSION_SAVE_EVERY_REQUEST is enabled.
200
        if not halflife and not request.session.modified and \
28✔
201
           not settings.SESSION_SAVE_EVERY_REQUEST:
202
            return response
28✔
203

204
        try:
28✔
205
            convert_cookie(response.cookies, user)
28✔
206

207
        except (KeyError, AttributeError):
×
208
            # No cookie, no problem...
209
            pass
×
210

211
        return response
28✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc