• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

askomics / flaskomics / 6590008757

20 Oct 2023 03:58PM UTC coverage: 83.758% (+0.4%) from 83.31%
6590008757

push

github-actions

web-flow
Merge pull request #420 from askomics/dev

Release 4.5.0

633 of 633 new or added lines in 29 files covered. (100.0%)

6240 of 7450 relevant lines covered (83.76%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.32
/askomics/api/auth.py
1
"""Authentication routes"""
2

3
import traceback
1✔
4
import sys
1✔
5

6
from functools import wraps
1✔
7

8
from askomics.libaskomics.LocalAuth import LocalAuth
1✔
9

10
from flask import (Blueprint, current_app, jsonify, request, session, render_template)
1✔
11

12
auth_bp = Blueprint('auth', __name__, url_prefix='/')
1✔
13

14

15
def login_required(f):
1✔
16
    """Login required function"""
17
    @wraps(f)
1✔
18
    def decorated_function(*args, **kwargs):
1✔
19
        """Login required decorator"""
20
        if 'user' in session and not session['user'].get('fake', False):
1✔
21
            if not session['user']['blocked']:
1✔
22
                return f(*args, **kwargs)
1✔
23
            return jsonify({"error": True, "errorMessage": "Blocked account"}), 401
1✔
24
        return jsonify({"error": True, "errorMessage": "Login required"}), 401
1✔
25

26
    return decorated_function
1✔
27

28

29
def login_required_query(f):
1✔
30
    """Login required function"""
31
    @wraps(f)
1✔
32
    def decorated_function(*args, **kwargs):
1✔
33
        """Login required decorator"""
34
        if 'user' in session:
1✔
35
            # If conf has changed, clear session
36
            if session['user'].get('fake', False) and not current_app.iniconfig.getboolean('askomics', 'anonymous_query', fallback=False):
1✔
37
                session.pop('user')
×
38
                return jsonify({"error": True, "errorMessage": "Login required"}), 401
×
39

40
            if not session['user']['blocked']:
1✔
41
                return f(*args, **kwargs)
1✔
42

43
            return jsonify({"error": True, "errorMessage": "Blocked account"}), 401
×
44
        elif current_app.iniconfig.getboolean('askomics', 'anonymous_query', fallback=False):
1✔
45
            local_auth = LocalAuth(current_app, session)
1✔
46
            session['user'] = local_auth.get_anonymous_user()
1✔
47
            return f(*args, **kwargs)
1✔
48
        return jsonify({"error": True, "errorMessage": "Login required"}), 401
1✔
49

50
    return decorated_function
1✔
51

52

53
def api_auth(f):
1✔
54
    """Get info from token"""
55
    @wraps(f)
1✔
56
    def decorated_function(*args, **kwargs):
1✔
57
        """Login required decorator"""
58
        if request.headers.get("X-API-KEY"):
1✔
59
            key = request.headers.get("X-API-KEY")
1✔
60
            local_auth = LocalAuth(current_app, session)
1✔
61
            authentication = local_auth.authenticate_user_with_apikey(key)
1✔
62
            if not authentication["error"]:
1✔
63
                session["user"] = authentication["user"]
1✔
64
        return f(*args, **kwargs)
1✔
65

66
    return decorated_function
1✔
67

68

69
def admin_required(f):
1✔
70
    """Login required function"""
71
    @wraps(f)
1✔
72
    def decorated_function(*args, **kwargs):
1✔
73
        """Login required decorator"""
74
        if 'user' in session:
1✔
75
            if session['user']['admin']:
1✔
76
                return f(*args, **kwargs)
1✔
77
            return jsonify({"error": True, "errorMessage": "Admin required"}), 401
1✔
78
        return jsonify({"error": True, "errorMessage": "Login required"}), 401
1✔
79

80
    return decorated_function
1✔
81

82

83
def local_required(f):
1✔
84
    """Local required function"""
85
    @wraps(f)
1✔
86
    def decorated_function(*args, **kwargs):
1✔
87
        """Local required decorator"""
88
        if "user" in session:
1✔
89
            current_app.logger.debug(session["user"])
1✔
90
            if not session["user"]["ldap"]:
1✔
91
                return f(*args, **kwargs)
1✔
92
            return jsonify({"error": True, "errorMessage": "Local user required"}), 401
1✔
93
        return jsonify({"error": True, "errorMessage": "Login required"}), 401
1✔
94

95
    return decorated_function
1✔
96

97

98
@auth_bp.route('/api/auth/signup', methods=['POST'])
1✔
99
def signup():
1✔
100
    """Register a new user
101

102
    Returns
103
    -------
104
    json
105
        Info about the user
106
    """
107
    if current_app.iniconfig.getboolean("askomics", "disable_account_creation"):
1✔
108
        return jsonify({
1✔
109
            'error': True,
110
            'errorMessage': "Account creation is disabled",
111
            'user': {}
112
        }), 400
113

114
    user = {}
1✔
115

116
    data = request.get_json()
1✔
117
    if not data:
1✔
118
        return jsonify({
×
119
            'error': True,
120
            'errorMessage': "Missing parameters",
121
            'user': {}
122
        }), 400
123

124
    local_auth = LocalAuth(current_app, session)
1✔
125
    local_auth.check_inputs(data)
1✔
126

127
    if not local_auth.get_error():
1✔
128
        user = local_auth.persist_user(data)
1✔
129
        local_auth.create_user_directories(user['id'], user['username'])
1✔
130
        session['user'] = user
1✔
131

132
    return jsonify({
1✔
133
        'error': local_auth.get_error(),
134
        'errorMessage': local_auth.get_error_message(),
135
        'user': user
136
    })
137

138

139
@auth_bp.route('/api/auth/login', methods=['POST'])
1✔
140
def login():
1✔
141
    """Log a user
142

143
    Returns
144
    -------
145
    json
146
        Information about the logged user
147
    """
148
    data = request.get_json()
1✔
149
    if not (data and data.get("login") and data.get("password")):
1✔
150
        return jsonify({
×
151
            'error': True,
152
            'errorMessage': "Missing login or password",
153
            'user': None
154
        }), 400
155

156
    local_auth = LocalAuth(current_app, session)
1✔
157
    authentication = local_auth.authenticate_user(data["login"], data["password"])
1✔
158

159
    user = {}
1✔
160
    if not authentication['error']:
1✔
161
        session['user'] = authentication['user']
1✔
162
        user = authentication['user']
1✔
163

164
    return jsonify({
1✔
165
        'error': authentication['error'],
166
        'errorMessage': authentication['error_messages'],
167
        'user': user
168
    })
169

170

171
@auth_bp.route('/loginapikey/<path:key>', methods=["GET"])
1✔
172
def login_api_key(key):
1✔
173
    """Log user with his API key
174

175
    Parameters
176
    ----------
177
    key : string
178
        User API key
179

180
    Returns
181
    -------
182
    json
183
        Information about the logged user
184
    """
185
    local_auth = LocalAuth(current_app, session)
1✔
186
    authentication = local_auth.authenticate_user_with_apikey(key)
1✔
187

188
    if not authentication["error"]:
1✔
189
        session["user"] = authentication["user"]
1✔
190

191
        proxy_path = "/"
1✔
192
        try:
1✔
193
            proxy_path = current_app.iniconfig.get('askomics', 'reverse_proxy_path')
1✔
194
        except Exception:
1✔
195
            pass
1✔
196

197
        title = "AskOmics"
1✔
198
        try:
1✔
199
            subtitle = current_app.iniconfig.get('askomics', 'subtitle')
1✔
200
            title = "AskOmics | {}".format(subtitle)
×
201
        except Exception:
1✔
202
            pass
1✔
203

204
        return render_template('index.html', title=title, proxy_path=proxy_path, redirect="/")
1✔
205

206
    else:
207

208
        return jsonify({
1✔
209
            'error': authentication['error'],
210
            'errorMessage': authentication['error_messages'],
211
            'user': authentication['user']
212
        })
213

214

215
@auth_bp.route('/api/auth/profile', methods=['POST'])
1✔
216
@local_required
1✔
217
def update_profile():
1✔
218
    """Update user profile (names and email)
219

220
    Returns
221
    -------
222
    json
223
        The updated user
224
    """
225
    data = request.get_json()
1✔
226
    if not (data and any([key in data for key in ["newFname", "newLname", "newEmail"]])):
1✔
227
        return jsonify({
×
228
            "error": True,
229
            "errorMessage": "Missing parameters"
230
        }), 400
231

232
    local_auth = LocalAuth(current_app, session)
1✔
233
    updated_user = local_auth.update_profile(data, session['user'])
1✔
234

235
    session['user'] = updated_user['user']
1✔
236

237
    return jsonify({
1✔
238
        'error': updated_user['error'],
239
        'errorMessage': updated_user['error_message'],
240
        'user': updated_user['user']
241
    })
242

243

244
@auth_bp.route('/api/auth/password', methods=['POST'])
1✔
245
@local_required
1✔
246
def update_password():
1✔
247
    """Update the user passord
248

249
    Returns
250
    -------
251
    json
252
        The user
253
    """
254
    data = request.get_json()
1✔
255
    if not (data and all([key in data for key in ["oldPassword", "newPassword", "confPassword"]])):
1✔
256
        return jsonify({
×
257
            "error": True,
258
            "errorMessage": "Missing parameters"
259
        }), 400
260

261
    local_auth = LocalAuth(current_app, session)
1✔
262
    updated_user = local_auth.update_password(data, session['user'])
1✔
263

264
    return jsonify({
1✔
265
        'error': updated_user['error'],
266
        'errorMessage': updated_user['error_message'],
267
        'user': updated_user['user']
268
    })
269

270

271
@auth_bp.route('/api/auth/apikey', methods=['GET'])
1✔
272
@login_required
1✔
273
def update_apikey():
1✔
274
    """Update the user apikey
275

276
    Returns
277
    -------
278
    json
279
        The user with his new apikey
280
    """
281
    local_auth = LocalAuth(current_app, session)
1✔
282
    updated_user = local_auth.update_apikey(session['user'])
1✔
283

284
    session['user'] = updated_user['user']
1✔
285

286
    return jsonify({
1✔
287
        'error': updated_user['error'],
288
        'errorMessage': updated_user['error_message'],
289
        'user': updated_user['user']
290
    })
291

292

293
@auth_bp.route('/api/auth/galaxy', methods=['POST'])
1✔
294
def update_galaxy():
1✔
295
    """Update the user apikey
296

297
    Returns
298
    -------
299
    json
300
        The user with his new apikey
301
    """
302
    data = request.get_json()
1✔
303
    if not (data and data.get("gurl") and data.get("gkey")):
1✔
304
        return jsonify({
×
305
            'error': True,
306
            'errorMessage': "Missing parameters",
307
            'user': session["user"]
308
        }), 400
309

310
    local_auth = LocalAuth(current_app, session)
1✔
311
    if session["user"]["galaxy"]:
1✔
312
        updated_user = local_auth.update_galaxy_account(session["user"], data["gurl"], data["gkey"])
1✔
313
    else:
314
        updated_user = local_auth.add_galaxy_account(session["user"], data["gurl"], data["gkey"])
1✔
315

316
    session["user"] = updated_user["user"]
1✔
317
    current_app.logger.debug(updated_user["user"])
1✔
318

319
    return jsonify({
1✔
320
        'error': updated_user['error'],
321
        'errorMessage': updated_user['error_message'],
322
        'user': updated_user['user']
323
    })
324

325

326
@auth_bp.route('/api/auth/logout', methods=['GET'])
1✔
327
def logout():
1✔
328
    """Logout the current user
329

330
    Returns
331
    -------
332
    json
333
        no username and logged false
334
    """
335
    session.pop('user', None)
1✔
336

337
    current_app.logger.debug(session)
1✔
338
    return jsonify({'user': {}, 'logged': False})
1✔
339

340

341
@auth_bp.route("/api/auth/reset_password", methods=["POST"])
1✔
342
def reset_password():
1✔
343
    """Reset password route"""
344
    data = request.get_json()
1✔
345
    if not data:
1✔
346
        return jsonify({
×
347
            "error": True,
348
            "errorMessage": "Missing parameters"
349
        }), 400
350

351
    # Send a reset link
352
    if "login" in data:
1✔
353
        try:
1✔
354
            local_auth = LocalAuth(current_app, session)
1✔
355
            local_auth.send_reset_link(data["login"])
1✔
356
        except Exception as e:
×
357
            traceback.print_exc(file=sys.stdout)
×
358
            return jsonify({
×
359
                "error": True,
360
                "errorMessage": str(e)
361
            }), 500
362

363
        return jsonify({
1✔
364
            "error": False,
365
            "errorMessage": ""
366
        })
367

368
    # check if token is valid
369
    elif "token" in data and "password" not in data:
1✔
370
        try:
1✔
371
            local_auth = LocalAuth(current_app, session)
1✔
372
            result = local_auth.check_token(data["token"])
1✔
373
        except Exception as e:
×
374
            traceback.print_exc(file=sys.stdout)
×
375
            return jsonify({
×
376
                "token": None,
377
                "username": None,
378
                "fname": None,
379
                "lname": None,
380
                "error": True,
381
                "errorMessage": str(e)
382
            }), 500
383

384
        return jsonify({
1✔
385
            "token": data["token"],
386
            "username": result["username"],
387
            "fname": result["fname"],
388
            "lname": result["lname"],
389
            "error": False if result["username"] else True,
390
            "errorMessage": result["message"]
391
        })
392

393
    # Update password
394
    elif all([key in data for key in ["token", "password", "passwordConf"]]):
1✔
395
        try:
1✔
396
            local_auth = LocalAuth(current_app, session)
1✔
397
            result = local_auth.reset_password_with_token(data["token"], data["password"], data["passwordConf"])
1✔
398
        except Exception as e:
×
399
            traceback.print_exc(file=sys.stdout)
×
400
            return jsonify({
×
401
                "error": True,
402
                "errorMessage": str(e)
403
            }), 500
404

405
        return jsonify({
1✔
406
            "error": result["error"],
407
            "errorMessage": result["message"]
408
        })
409
    else:
410
        return jsonify({
×
411
            "error": True,
412
            "errorMessage": "Missing parameters"
413
        }), 400
414

415

416
@auth_bp.route("/api/auth/delete_account", methods=["GET"])
1✔
417
@login_required
1✔
418
def delete_account():
1✔
419
    """Remove account"""
420
    try:
1✔
421
        # Remove user from database
422
        local_auth = LocalAuth(current_app, session)
1✔
423
        local_auth.delete_user_database(session["user"]["username"])
1✔
424

425
        # Celery task to delete user's data from filesystem and rdf triplestore
426
        session_dict = {'user': session['user']}
1✔
427
        current_app.celery.send_task('delete_users_data', (session_dict, [session["user"], ], True))
1✔
428

429
        # Remove user from session
430
        session.pop('user', None)
1✔
431

432
    except Exception as e:
×
433
        traceback.print_exc(file=sys.stdout)
×
434
        return jsonify({
×
435
            "error": True,
436
            "errorMessage": str(e)
437
        }), 500
438

439
    return jsonify({
1✔
440
        "error": False,
441
        "errorMessage": ""
442
    })
443

444

445
@auth_bp.route("/api/auth/reset_account", methods=["GET"])
1✔
446
@login_required
1✔
447
def reset_account():
1✔
448
    """Reset account"""
449
    try:
1✔
450
        # Remove user from database
451
        local_auth = LocalAuth(current_app, session)
1✔
452
        local_auth.delete_user_database(session["user"]["username"], delete_user=False)
1✔
453

454
        # Celery task to delete user's data from filesystem and rdf triplestore
455
        session_dict = {'user': session['user']}
1✔
456
        current_app.celery.send_task('delete_users_data', (session_dict, [session["user"], ], False))
1✔
457

458
    except Exception as e:
×
459
        traceback.print_exc(file=sys.stdout)
×
460
        return jsonify({
×
461
            "error": True,
462
            "errorMessage": str(e)
463
        }), 500
464

465
    return jsonify({
1✔
466
        "error": False,
467
        "errorMessage": ""
468
    })
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc