• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smartfile / django-session-jwt / 3850250062

pending completion
3850250062

Pull #44

github

GitHub
Merge a3897318e into c9f5d419f
Pull Request #44: Remove unsupported python version

278 of 291 relevant lines covered (95.53%)

3.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/django_session_jwt/middleware/session.py
1
import logging
4✔
2
import time
4✔
3

4
from datetime import datetime, timedelta
4✔
5

6
from os.path import exists as pathexists
4✔
7

8
import jwt
4✔
9
from jwt.exceptions import DecodeError, ExpiredSignatureError
4✔
10

11
from importlib import import_module
4✔
12

13
from django.conf import settings
4✔
14
from django.contrib.auth import get_user_model
4✔
15
from django.contrib.sessions.middleware import SessionMiddleware as BaseSessionMiddleware
4✔
16
from django.core.exceptions import ImproperlyConfigured
4✔
17

18

19
def _parse_key(key):
4✔
20
    def _load_key(k):
4✔
21
        if pathexists(k):
4✔
22
            k = open(k, 'rb').read()
4✔
23
        return k
4✔
24

25
    if isinstance(key, tuple):
4✔
26
        # Key pair.
27
        return _load_key(key[0]), _load_key(key[1]), 'RS256'
4✔
28

29
    key = _load_key(key)
4✔
30
    return key, key, 'HS256'
4✔
31

32

33
def _parse_fields(fields):
4✔
34
    "Parse and validate field definitions."
35
    snames, lnames = [], []
4✔
36

37
    for i, field in enumerate(fields):
4✔
38
        # Transform field in 3-tuple.
39
        if isinstance(field, tuple):
4✔
40
            if len(field) == 2:  # (attrname, sname)
4✔
41
                field = (field[0], field[1], field[1])
×
42

43
            elif len(field) == 1:  # (attrname)
4✔
44
                field = (field[0], field[0], field[0])
×
45

46
        else:  # attrname
47
            field = (field, field, field)
×
48

49
        # Collect all snames and lnames for uniqueness check.
50
        snames.append(field[1])
4✔
51
        lnames.append(field[2])
4✔
52

53
        # Validate that "sk" is not used, we use that for the session key.
54
        if field[1] == SESSION_FIELD:
4✔
55
            raise ImproperlyConfigured(
×
56
                'Short name "%s" is reserved for session field. Use '
57
                'DJANGO_SESSION_JWT["SESSION_FIELD"] to specify another '
58
                'value.' % SESSION_FIELD)
59

60
        if len(field) != 3:
4✔
61
            raise ImproperlyConfigured(
×
62
                'DJANGO_SESSION_JWT["FIELDS"] should be a list of 3-tuples.')
63

64
        fields[i] = field
4✔
65

66
    if len(snames) != len(set(snames)):
4✔
67
        raise ImproperlyConfigured(
×
68
            'DJANGO_SESSION_JWT["FIELDS"] short names are not unique')
69

70
    if len(lnames) != len(set(lnames)):
4✔
71
        raise ImproperlyConfigured(
×
72
            'DJANGO_SESSION_JWT["FIELDS"] long names are not unique')
73

74
    return fields
4✔
75

76

77
def _parse_callable(f):
4✔
78
    if not f:
4✔
79
        return
×
80

81
    module_name, _, f_name = f.rpartition('.')
4✔
82
    m = import_module(module_name)
4✔
83

84
    return getattr(m, f_name)
4✔
85

86

87
DJANGO_SESSION_JWT = getattr(settings, 'DJANGO_SESSION_JWT', {})
4✔
88
SESSION_FIELD = DJANGO_SESSION_JWT.get('SESSION_FIELD', 'sk')
4✔
89
KEY, PUBKEY, ALGO = _parse_key(DJANGO_SESSION_JWT.get('KEY', settings.SECRET_KEY))
4✔
90
FIELDS = _parse_fields(DJANGO_SESSION_JWT.get('FIELDS', []))
4✔
91
CALLABLE = _parse_callable(DJANGO_SESSION_JWT.get('CALLABLE'))
4✔
92
EXPIRES = DJANGO_SESSION_JWT.get('EXPIRES', None)
4✔
93
LOGGER = logging.getLogger(__name__)
4✔
94
LOGGER.addHandler(logging.NullHandler())
4✔
95

96

97
def rgetattr(obj, name):
4✔
98
    "Recursive getattr()."
99
    names = name.split('.')
4✔
100
    for n in names:
4✔
101
        obj = getattr(obj, n)
4✔
102
    return obj
4✔
103

104

105
def verify_jwt(blob):
4✔
106
    """
107
    Verify a JWT and return the session_key attribute from it.
108
    """
109
    try:
4✔
110
        fields = jwt.decode(blob, PUBKEY, algorithms=[ALGO])
4✔
111

112
    except (DecodeError, ExpiredSignatureError):
4✔
113
        return {}
4✔
114

115
    # Convert short names to long names.
116
    for _, sname, lname in FIELDS:
4✔
117
        try:
4✔
118
            # Leave both short and long forms in dictionary.
119
            fields[lname] = fields[sname]
4✔
120

121
        except KeyError:
4✔
122
            continue
4✔
123

124
    return fields
4✔
125

126

127
def create_jwt(user, session_key, expires=None):
4✔
128
    """
129
    Create a JWT for the given user containing the configured fields.
130
    """
131
    fields = {
4✔
132
        SESSION_FIELD: session_key,
133
        'iat': datetime.utcnow(),
134
    }
135
    if expires:
4✔
136
        # Set a future expiration date.
137
        fields['exp'] = datetime.utcnow() + timedelta(seconds=expires)
4✔
138

139
    for attrname, sname, _ in FIELDS:
4✔
140
        try:
4✔
141
            fields[sname] = rgetattr(user, attrname)
4✔
142

143
        except AttributeError:
4✔
144
            # Omit missing fields:
145
            LOGGER.warning('Could not get missing field %s from user', attrname)
4✔
146
            continue
4✔
147

148
    if CALLABLE:
4✔
149
        fields.update(CALLABLE(user))
4✔
150

151
    return jwt.encode(fields, KEY, algorithm=ALGO)
4✔
152

153

154
def convert_cookie(cookies, user):
4✔
155
    cookie = cookies[settings.SESSION_COOKIE_NAME]
4✔
156
    cookies[settings.SESSION_COOKIE_NAME] = create_jwt(
4✔
157
        user, cookie.value, EXPIRES)
158

159

160
class SessionMiddleware(BaseSessionMiddleware):
4✔
161
    """
162
    Extend django.contrib.sessions middleware to use JWT as session cookie.
163
    """
164

165
    def process_request(self, request):
4✔
166
        session_jwt = request.COOKIES.get(settings.SESSION_COOKIE_NAME)
4✔
167
        fields = verify_jwt(session_jwt)
4✔
168

169
        session_key = fields.pop(SESSION_FIELD, None)
4✔
170
        request.session = self.SessionStore(session_key)
4✔
171
        if fields:
4✔
172
            request.session['jwt'] = fields
4✔
173

174
    def process_response(self, request, response):
4✔
175
        if not request.user.is_authenticated:
4✔
176
            # The user is unauthenticated. Try to determine the user by the
177
            # session JWT
178
            User = get_user_model()
4✔
179
            try:
4✔
180
                user_id = request.session['jwt']['user_id']
4✔
181
                user = User.objects.get(id=user_id)
4✔
182
            except (KeyError, User.DoesNotExist):
4✔
183
                # Unable to determine the user. ID will not be set in the JWT.
184
                user = None
4✔
185
        else:
186
            user = request.user
4✔
187

188
        # Rather than duplicating the session logic here, just allow super()
189
        # to do it's thing, then convert the session cookie (if any) when it's
190
        # done.
191
        super(SessionMiddleware, self).process_response(request, response)
4✔
192

193
        # Determine if JWT is more than halfway through it's lifetime.
194
        expires = getattr(request, 'session', {}).get('jwt', {}).get('exp', None)
4✔
195
        halftime = time.mktime((datetime.utcnow() + timedelta(seconds=EXPIRES / 2)).timetuple())
4✔
196
        halflife = expires and expires <= halftime
4✔
197

198
        # Behave the same as contrib.sessions, only recreate the JWT if the session
199
        # was modified or SESSION_SAVE_EVERY_REQUEST is enabled.
200
        if not halflife and not request.session.modified and \
4✔
201
           not settings.SESSION_SAVE_EVERY_REQUEST:
202
            return response
4✔
203

204
        try:
4✔
205
            convert_cookie(response.cookies, user)
4✔
206

207
        except (KeyError, AttributeError):
×
208
            # No cookie, no problem...
209
            pass
×
210

211
        return response
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc