• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SeaweedbrainCY / zero-totp / 12592571620

03 Jan 2025 04:04AM UTC coverage: 94.427% (-0.4%) from 94.865%
12592571620

Pull #157

github

SeaweedbrainCY
Add build info
Pull Request #157: Add healthcheck endpoint

18 of 46 new or added lines in 3 files covered. (39.13%)

110 existing lines in 3 files now uncovered.

11081 of 11735 relevant lines covered (94.43%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.52
api/controllers.py
1
from flask import request, redirect, make_response
1✔
2
from Utils.http_response import Response
1✔
3
import flask
1✔
4
import connexion
1✔
5
import json
1✔
6
from database.user_repo import User as UserDB
1✔
7
from database.zke_repo import ZKE as ZKE_DB
1✔
8
from database.totp_secret_repo import TOTP_secret as TOTP_secretDB
1✔
9
from database.google_drive_integration_repo import GoogleDriveIntegration as GoogleDriveIntegrationDB
1✔
10
from database.preferences_repo import Preferences as PreferencesDB
1✔
11
from database.notif_repo import Notifications as Notifications_db
1✔
12
from database.refresh_token_repo import RefreshTokenRepo as RefreshToken_db
1✔
13
from database.rate_limiting_repo import RateLimitingRepo as Rate_Limiting_DB
1✔
14
from database.session_token_repo import SessionTokenRepo 
1✔
15
from CryptoClasses.hash_func import Bcrypt
1✔
16
from environment import logging, conf
1✔
17
from database.oauth_tokens_repo import Oauth_tokens as Oauth_tokens_db
1✔
18
from CryptoClasses.hash_func import Bcrypt
1✔
19
from Oauth import google_drive_api
1✔
20
import random
1✔
21
import string
1✔
22
from Email import send as send_email
1✔
23
from database.email_verification_repo import EmailVerificationToken as EmailVerificationToken_db
1✔
24
from CryptoClasses.sign_func import API_signature
1✔
25
from CryptoClasses import refresh_token as refresh_token_func
1✔
26
import Oauth.oauth_flow as oauth_flow
1✔
27
import Utils.utils as utils
1✔
28
import os
1✔
29
import base64
1✔
30
import datetime
1✔
31
from Utils.security_wrapper import  require_valid_user, require_passphrase_verification,require_valid_user, require_userid, ip_rate_limit
1✔
32
import traceback
1✔
33
from hashlib import sha256
1✔
34
from CryptoClasses.encryption import ServiceSideEncryption 
1✔
35
from database.db import db
1✔
36
import threading
1✔
37
from uuid import uuid4
1✔
38
import hmac
1✔
39
import hashlib
1✔
40
from sqlalchemy import text
1✔
41
 
42

43

44

45

46

47

48
if conf.environment.type == "development":
1✔
49
    logging.getLogger().setLevel(logging.INFO)
1✔
50

51

52
# POST /signup
53
def signup():
1✔
54
    try:
1✔
55
        data = request.get_json()
1✔
56
        username = utils.sanitize_input(data["username"].strip())
1✔
57
        passphrase = data["password"].strip()
1✔
58
        email = utils.sanitize_input(data["email"].strip())
1✔
59
        derivedKeySalt = utils.sanitize_input(data["derivedKeySalt"].strip())
1✔
60
        zke_key = utils.sanitize_input(data["ZKE_key"].strip())
1✔
61
        passphraseSalt = utils.sanitize_input(data["passphraseSalt"].strip())
1✔
62
    except Exception as e: # pragma: no cover
63
        logging.info(e)
64
        return {"message": "Invalid request"}, 400
65
    
66
    if not username or not passphrase or not email or not derivedKeySalt or not zke_key or not passphraseSalt:
1✔
67
        return {"message": "Missing parameters"}, 400
1✔
68
    if len(username) > 250:
1✔
69
        return {"message": "Username is too long"}, 400
1✔
70
    if not utils.check_email(email) :
1✔
71
        return {"message": "Bad email format"}, 401
1✔
72
    userDB = UserDB()
1✔
73
    user = userDB.getByEmail(email)
1✔
74
    if user:
1✔
UNCOV
75
        return {"message": "User already exists"}, 409
×
76
    check_username = userDB.getByUsername(username)
1✔
77
    if check_username:
1✔
78
        return {"message": "Username already exists"}, 409
1✔
79
    bcrypt = Bcrypt(passphrase)
1✔
80
    try : 
1✔
81
        hashedpw = bcrypt.hashpw()
1✔
82
    except ValueError as e:
1✔
83
        logging.debug(e)
1✔
84
        return {"message": "Password is too long"}, 400
1✔
UNCOV
85
    except Exception as e:
×
86
        logging.warning("Uknown error occured while hashing password" + str(e))
×
87
        return {"message": "Unknown error while hashing your password"}, 500
×
88
    try:
1✔
89
        today = datetime.datetime.now().strftime("%d/%m/%Y")
1✔
90
        user = userDB.create(username=username, email=email, password=hashedpw, randomSalt=derivedKeySalt, isVerified=0,isBlocked=0, passphraseSalt=passphraseSalt, today=today)
1✔
UNCOV
91
    except Exception as e:
×
UNCOV
92
        logging.error("Unknown error while creating user" + str(e))
×
93
        return {"message": "Unknown error while creating user"}, 500
×
94
    if user :
1✔
95
        try:
1✔
96
            zke_db = ZKE_DB()
1✔
97
            zke_key = zke_db.create(user.id, zke_key)
1✔
UNCOV
98
        except Exception as e:
×
UNCOV
99
           zke_key = None
×
100
        if zke_key:
1✔
101
            _,session_token = SessionTokenRepo().generate_session_token(user.id)
1✔
102
            if conf.features.emails.require_email_validation:
1✔
103
                try:
1✔
104
                    send_verification_email(user=user.id, context_={"user":user.id}, token_info={"user":user.id})
1✔
105
                except Exception as e:
1✔
106
                    logging.error("Unknown error while sending verification email" + str(e))
1✔
107
            response = Response(status=201, mimetype="application/json", response=json.dumps({"message": "User created"}))
1✔
108
            response.set_cookie("session-token", session_token, httponly=True, secure=True, samesite="Lax", max_age=3600)
1✔
109
            return response
1✔
110
        else :
111
            userDB.delete(user.id)
×
UNCOV
112
            logging.error("Unknown error while storing user ZKE keys" + str(username))
×
UNCOV
113
            return {"message": "Unknown error while registering user encrypted keys"}, 500
×
114
    else :
UNCOV
115
        logging.error("Unknown error while creating user" + str(username))
×
UNCOV
116
        return {"message": "Unknown error while creating user"}, 500
×
117

118

119

120
# POST /login
121
@ip_rate_limit
1✔
122
def login(ip, body):
1✔
123
    passphrase = body["password"].strip()
1✔
124
    email = utils.sanitize_input(body["email"]).strip()
1✔
125
    rate_limiting_db = Rate_Limiting_DB()
1✔
126
    if not passphrase or not email:
1✔
127
        return {"message": "generic_errors.missing_params"}, 400
1✔
128
    if(not utils.check_email(email) ):
1✔
129
        return {"message": "generic_errors.bad_email"}, 403
1✔
130
    userDB = UserDB()
1✔
131
    user = userDB.getByEmail(email)
1✔
132
    bcrypt = Bcrypt(passphrase)
1✔
133
    if not user:
1✔
134
        logging.info("User " + str(email) + " tried to login but does not exist. A fake password is checked to avoid timing attacks")
1✔
135
        fakePassword = ''.join(random.choices(string.ascii_letters, k=random.randint(10, 20)))
1✔
136
        bcrypt.checkpw(fakePassword)
1✔
137
        
138
        rate_limiting_db.add_failed_login(ip)
1✔
139
        return {"message": "generic_errors.invalid_creds"}, 403
1✔
140
    logging.info(f"User {user.id} is trying to logging in from gateway {request.remote_addr} and IP {ip}. X-Forwarded-For header is {request.headers.get('X-Forwarded-For')}")
1✔
141
    checked = bcrypt.checkpw(user.password)
1✔
142
    if not checked:
1✔
143
        rate_limiting_db.add_failed_login(ip, user.id)
1✔
144
        return {"message": "generic_errors.invalid_creds"}, 403
1✔
145
    if user.isBlocked: # only authenticated users can see the blocked status
1✔
146
        return {"message": "blocked"}, 403
1✔
147
    
148
    rate_limiting_db.flush_login_limit(ip)
1✔
149
    session_id, session_token = SessionTokenRepo().generate_session_token(user.id)
1✔
150
    refresh_token = refresh_token_func.generate_refresh_token(user.id, session_id)
1✔
151
    if not conf.features.emails.require_email_validation: # we fake the isVerified status if email validation is not required
1✔
UNCOV
152
        response = Response(status=200, mimetype="application/json", response=json.dumps({"username": user.username, "id":user.id, "derivedKeySalt":user.derivedKeySalt, "isGoogleDriveSync": GoogleDriveIntegrationDB().is_google_drive_enabled(user.id), "role":user.role, "isVerified":True}))
×
153
    elif user.isVerified:
1✔
154
        response = Response(status=200, mimetype="application/json", response=json.dumps({"username": user.username, "id":user.id, "derivedKeySalt":user.derivedKeySalt, "isGoogleDriveSync": GoogleDriveIntegrationDB().is_google_drive_enabled(user.id), "role":user.role, "isVerified":user.isVerified}))
1✔
155
    else:
156
        response = Response(status=200, mimetype="application/json", response=json.dumps({"isVerified":user.isVerified}))
1✔
157
    userDB.update_last_login_date(user.id)
1✔
158
    response.set_auth_cookies(session_token, refresh_token)
1✔
159
    return response
1✔
160

161
#POST logout
162
@require_valid_user
1✔
163
def logout(_):
1✔
164
    session_token = request.cookies.get("session-token")
1✔
165
    session_repo = SessionTokenRepo()
1✔
166
    session = session_repo.get_session_token(session_token)
1✔
167
    if not session:
1✔
UNCOV
168
        return {"message": "Session not found"}, 404
×
169
    utils.revoke_session(session_id=session.id)
1✔
170
    response = Response(status=200, mimetype="application/json", response=json.dumps({"message": "Logged out"}))
1✔
171
    response.delete_cookie("session-token")
1✔
172
    response.delete_cookie("refresh-token")
1✔
173
    return response
1✔
174

175

176
#GET /login/specs
177
def get_login_specs(username):
1✔
178
    rate_limiting_db = Rate_Limiting_DB()
1✔
179
    logging.debug("User " + str(username) + " is trying to get login specs")
1✔
180
    ip = utils.get_ip(request)
1✔
181
    if ip:
1✔
182
        if rate_limiting_db.is_login_rate_limited(ip):
1✔
183
            return {"message": "Too many requests", 'ban_time':conf.features.rate_limiting.login_ban_time}, 429
1✔
184
    if(not utils.check_email(username)):
1✔
185
        return {"message": "Bad request"}, 400
1✔
186
    userDB = UserDB()
1✔
187
    user = userDB.getByEmail(username)
1✔
188
    if user :
1✔
189
        return {"passphrase_salt": user.passphraseSalt}, 200
1✔
190
    else :
191
        fake_salt = base64.b64encode(os.urandom(16)).decode("utf-8")
1✔
192
        return {"passphrase_salt": fake_salt}, 200
1✔
193

194
    
195

196
    
197
    
198
#GET /encrypted_secret/{uuid}
199
@require_valid_user
1✔
200
def get_encrypted_secret(user_id, uuid):
1✔
201
    totp_secretDB =  TOTP_secretDB()
1✔
202
    enc_secret = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
203
    if not enc_secret:
1✔
204
        return {"message": "Forbidden"}, 403
1✔
205
    else:
206
        if enc_secret.user_id == user_id:
1✔
207
            return {"enc_secret": enc_secret.secret_enc}, 200
1✔
208
        else :    
UNCOV
209
            logging.warning("User " + str(user_id) + " tried to access secret " + str(uuid) + " which is not his")
×
UNCOV
210
            return {"message": "Forbidden"}, 403
×
211
        
212
#POST /encrypted_secret
213
@require_valid_user
1✔
214
def add_encrypted_secret(user_id, body):
1✔
215
    enc_secret = utils.sanitize_input(body["enc_secret"]).strip()
1✔
216
    secret_uuid = ""
1✔
217
    totp_secretDB =  TOTP_secretDB()
1✔
218
    while secret_uuid == "":
1✔
219
        tmp_uuid = str(uuid4())
1✔
220
        if not totp_secretDB.get_enc_secret_by_uuid(tmp_uuid):
1✔
221
            secret_uuid = tmp_uuid
1✔
222
        
223
    if totp_secretDB.add(user_id, enc_secret, secret_uuid):
1✔
224
        return {"uuid": secret_uuid}, 201
1✔
225
    else :
UNCOV
226
        logging.warning("Unknown error while adding encrypted secret for user " + str(user_id))
×
UNCOV
227
        return {"message": "Unknown error while adding encrypted secret"}, 500
×
228

229
#PUT /encrypted_secret/{uuid}
230
@require_valid_user
1✔
231
def update_encrypted_secret(user_id,uuid, body):
1✔
232
    enc_secret = body["enc_secret"]
1✔
233
    
234
    totp_secretDB =  TOTP_secretDB()
1✔
235
    totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
236
    if not totp:
1✔
237
        logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " which does not exist")
1✔
238
        return {"message": "Forbidden"}, 403
1✔
239
    else:
240
        if totp.user_id != user_id:
1✔
241
            logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " which is not his")
×
UNCOV
242
            return {"message": "Forbidden"}, 403
×
243
        totp = totp_secretDB.update_secret(uuid=uuid, enc_secret=enc_secret, user_id=user_id)
1✔
244
        if totp == None:
1✔
UNCOV
245
                logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " but an error occurred server side while storing your encrypted secret")
×
UNCOV
246
                return {"message": "An error occurred server side while storing your encrypted secret"}, 500
×
247
        else:
248
                return {"message": "Encrypted secret updated"}, 201
1✔
249

250
#DELETE /encrypted_secret/{uuid}
251
@require_valid_user
1✔
252
def delete_encrypted_secret(user_id,uuid):
1✔
253
    if(uuid == ""):
1✔
UNCOV
254
        return {"message": "Invalid request"}, 400
×
255
    totp_secretDB =  TOTP_secretDB()
1✔
256
    totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
257
    if not totp:
1✔
258
        logging.debug("User " + str(user_id) + " tried to delete secret " + str(uuid) + " which does not exist")
1✔
259
        return {"message": "Forbidden"}, 403
1✔
260
    else:
261
        if totp.user_id != user_id:
1✔
262
            logging.warning("User " + str(user_id) + " tried to delete secret " + str(uuid) + " which is not his")
×
263
            return {"message": "Forbidden"}, 403
×
264
        if totp_secretDB.delete(uuid=uuid, user_id= user_id):
1✔
265
            return {"message": "Encrypted secret deleted"}, 201
1✔
266
        else:
UNCOV
267
            logging.warning("Unknown error while deleting encrypted secret for user " + str(user_id) )
×
UNCOV
268
            return {"message": "Unknown error while deleting encrypted secret"}, 500
×
269
        
270

271
#GET /all_secrets
272
@require_valid_user
1✔
273
def get_all_secrets(user_id):
1✔
274
    totp_secretDB =  TOTP_secretDB()
1✔
275
    enc_secrets = totp_secretDB.get_all_enc_secret_by_user_id(user_id)
1✔
276
    if not enc_secrets:
1✔
277
        return {"message": "No secret found"}, 404
1✔
278
    else:
279
        secrets = []
1✔
280
        for enc_secret in enc_secrets:
1✔
281
           secret = {"uuid": enc_secret.uuid, "enc_secret": enc_secret.secret_enc}
1✔
282
           secrets.append(secret)
1✔
283
        return {"enc_secrets": secrets}, 200
1✔
284

285

286
#GET /zke_encrypted_key
287
@require_valid_user
1✔
288
def get_ZKE_encrypted_key(user_id):
1✔
289
        logging.info(user_id)
1✔
290
        zke_db = ZKE_DB()
1✔
291
        zke_key = zke_db.getByUserId(user_id)
1✔
292
        if zke_key:
1✔
293
                return {"zke_encrypted_key": zke_key.ZKE_key}, 200
1✔
294
        else:
UNCOV
295
            return {"message": "No ZKE key found for this user"}, 404
×
296

297

298

299
#PUT /update/email
300
@require_userid
1✔
301
def update_email(user_id,body):
1✔
302
   
303
    email = utils.sanitize_input(body["email"]).strip()
1✔
304
    if not utils.check_email(email):
1✔
305
        return {"message": "This email doesn't have the right format. Check it and try again"}, 400
1✔
306
         
307
    userDb = UserDB()
1✔
308
    already_existing_user = userDb.getByEmail(email)
1✔
309
    if already_existing_user:
1✔
310
        if already_existing_user.id == user_id:
1✔
311
            return {"message":email},201
1✔
312
        else:
313
            return {"message": "This email already exists"}, 403
1✔
314
    old_mail = userDb.getById(user_id).mail
1✔
315
    user = userDb.update_email(user_id=user_id, email=email, isVerified=0)
1✔
316
    if user:
1✔
317
        try:
1✔
318
            ip = utils.get_ip(request)
1✔
319
            thread = threading.Thread(target=utils.send_information_email,args=(ip, old_mail, "Your email address has been updated"))
1✔
320
            thread.start()
1✔
UNCOV
321
        except Exception as e:
×
UNCOV
322
            logging.error("Unknown error while sending information email" + str(e))
×
323
        if conf.features.emails.require_email_validation:
1✔
324
            try:
1✔
325
           
326
                send_verification_email(user=user_id, context_={"user":user_id}, token_info={"user":user_id})
1✔
327
            except Exception as e:
1✔
328
                logging.error("Unknown error while sending verification email" + str(e))
1✔
329
            return {"message":user.mail},201
1✔
330
        else:
UNCOV
331
            return {"message":user.mail},201
×
332
    else :
UNCOV
333
        logging.warning("An error occured while updating email of user " + str(user_id))
×
UNCOV
334
        return {"message": "Unknown error while updating email"}, 500
×
335

336
#PUT /update/username
337
@require_valid_user
1✔
338
def update_username(user_id,body):
1✔
339
    username = utils.sanitize_input(body["username"].strip())
1✔
340
    if not username:
1✔
UNCOV
341
        return {"message": "generic_errors.missing_params"}, 400
×
342
    userDb = UserDB()
1✔
343
    if len(username) > 250:
1✔
344
        return {"message": "Username is too long"}, 400
1✔
345
    already_existing_user = userDb.getByUsername(username)
1✔
346
    if already_existing_user:
1✔
347
        if already_existing_user.id == user_id:
1✔
348
            return {"message":username},201
1✔
349
        else:
350
            return {"message": "generic_errors.username_exists"}, 409
1✔
351
    user = userDb.update_username(user_id=user_id, username=username)
1✔
352
    if user:
1✔
353
        return {"message":user.username},201
1✔
354
    else :
UNCOV
355
        logging.warning("An error occured while updating username of user " + str(user_id))
×
UNCOV
356
        return {"message": "Unknown error while updating username"}, 500
×
357
   
358
#PUT /update/vault 
359
@require_valid_user
1✔
360
def update_vault(user_id, body):
1✔
361
    returnJson = {"message": "Internal server error", "hashing":-1, "totp":-1, "user":-1, "zke":-1}
1✔
362
    try:
1✔
363
        newPassphrase = body["new_passphrase"].strip()
1✔
364
        old_passphrase = body["old_passphrase"].strip()
1✔
365
        enc_vault = body["enc_vault"].strip()
1✔
366
        enc_vault = json.loads(enc_vault)
1✔
367
        zke_key = body["zke_enc"].strip()
1✔
368
        passphrase_salt = body["passphrase_salt"].strip()
1✔
369
        derivedKeySalt = body["derived_key_salt"].strip()
1✔
370
    except Exception as e:
1✔
371
        logging.error(e)
1✔
372
        return '{"message": "Invalid request"}', 400
1✔
373

374
    if not newPassphrase or not old_passphrase or not zke_key or not passphrase_salt or not derivedKeySalt:
1✔
375
        return {"message": "Missing parameters"}, 400
1✔
376
    
377
    
378
    
379
    is_vault_valid, vault_validation_msg = utils.unsafe_json_vault_validation(enc_vault)
1✔
380
    if not is_vault_valid:
1✔
381
        return {"message": vault_validation_msg}, 400
1✔
382
    userDb = UserDB()
1✔
383
    zke_db = ZKE_DB()
1✔
384
    totp_secretDB = TOTP_secretDB()
1✔
385

386
    user = userDb.getById(user_id)
1✔
387
    bcrypt = Bcrypt(old_passphrase)
1✔
388
    if not bcrypt.checkpw(user.password):
1✔
389
        logging.info("User " + str(user_id) + " tried to update his vault but provided passphrase is wrong.")
1✔
390
        return {"message": "Invalid passphrase"}, 403
1✔
391
    bcrypt = Bcrypt(newPassphrase)
1✔
392
    try :
1✔
393
        hashedpw = bcrypt.hashpw()
1✔
394
    except ValueError as e:
1✔
395
        logging.warning(e)
1✔
396
        return {"message": "passphrase too long. It must be <70 char"}, 400
1✔
UNCOV
397
    except Exception as e:
×
UNCOV
398
        logging.warning("Uknown error occured while hashing password" + str(e))
×
UNCOV
399
        returnJson["hashing"]=0
×
UNCOV
400
        return returnJson, 500
×
401
    
402
    old_vault = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id)
1✔
403
    if len(old_vault) != len(enc_vault):
1✔
404
        logging.warning(f"User {user_id} tried to update his vault but the number of secrets in the new vault is different from the old one. The update is rejected.")
1✔
405
        return {"message": "To avoid the loss of your information, Zero-TOTP is rejecting this request because it has detected that you might lose data. Please contact quickly Zero-TOTP developers to fix issue."}, 400
1✔
406
    for secret in old_vault:
1✔
407
        if secret.uuid not in enc_vault:
1✔
408
            logging.warning(f"FORBIDDEN. The user {user_id} tried to update but the secret {secret.uuid} is missing in the new vault. The update is rejected.")
1✔
409
            return {"message": "Forbidden action. Zero-TOTP detected that you were updating object you don't have access to. The request is rejected."}, 403
1✔
410
    
411

412
    returnJson["hashing"]=1
1✔
413
    errors = 0
1✔
414
    for secret in enc_vault.keys():
1✔
415
        totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, secret)
1✔
416
        if not totp:
1✔
UNCOV
417
            totp = totp_secretDB.add(user_id=user_id, enc_secret=enc_vault[secret], uuid=secret)
×
418
            if not totp:
×
419
                logging.warning("Unknown error while adding encrypted secret for user " + str(user_id))
×
UNCOV
420
                errors = 1
×
421
        else:
422
            if totp.user_id != user_id:
1✔
423
                logging.warning("User " + str(user_id) + " tried to update secret " + str(secret) + " which is not his")
×
424
                errors = 1
×
425
            else :
426
                totp = totp_secretDB.update_secret(uuid=secret, enc_secret=enc_vault[secret], user_id=user_id)
1✔
427
                if totp == None:
1✔
UNCOV
428
                    logging.warning("User " + str(user_id) + " tried to update secret " + str(secret) + " but an error occurred server side while storing your  encrypted secret")
×
UNCOV
429
                    errors = 1
×
430
    zke = zke_db.update(user_id, zke_key)
1✔
431
    userUpdated = userDb.update(user_id=user_id, passphrase=hashedpw, passphrase_salt=passphrase_salt, derivedKeySalt=derivedKeySalt)
1✔
432
    returnJson["totp"]=1 if errors == 0 else 0
1✔
433
    returnJson["user"]=1 if userUpdated else 0
1✔
434
    returnJson["zke"]=1 if zke else 0
1✔
435
    if errors == 0 and userUpdated and zke:
1✔
436
        try:
1✔
437
            ip = utils.get_ip(request)
1✔
438
            thread = threading.Thread(target=utils.send_information_email,args=(ip, user.mail, "Your vault passphrase has been updated"))
1✔
439
            thread.start()
1✔
440
        except Exception as e:
×
UNCOV
441
            logging.error("Unknown error while sending information email" + str(e))
×
442
        return {"message": "Vault updated"}, 201
1✔
443
    else:
UNCOV
444
        logging.warning("An error occured while updating passphrase of user " + str(user_id))
×
UNCOV
445
        return returnJson, 500
×
446

447

448
@require_valid_user
1✔
449
def export_vault(user_id):
1✔
450
    
451
    vault = {"version":1, "date": str(datetime.datetime.utcnow())}
1✔
452
    user = UserDB().getById(user_id=user_id)
1✔
453
    zkeKey = ZKE_DB().getByUserId(user_id=user_id)
1✔
454
    totp_secrets_list = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id=user_id)
1✔
455
    if not user or not zkeKey:
1✔
UNCOV
456
        return {"message" : "User not found"}, 404
×
457
    
458
    vault["derived_key_salt"] = user.derivedKeySalt
1✔
459
    vault["zke_key_enc"] = zkeKey.ZKE_key
1✔
460
    secrets = utils.get_all_secrets_sorted(totp_secrets_list)
1✔
461
    vault["secrets"] = secrets
1✔
462
    vault["secrets_sha256sum"] = sha256(json.dumps(vault["secrets"],  sort_keys=True).encode("utf-8")).hexdigest()
1✔
463
    vault_b64 = base64.b64encode(json.dumps(vault).encode("utf-8")).decode("utf-8")
1✔
464
    signature = API_signature().sign_rsa(vault_b64)
1✔
465
    vault = vault_b64 + "," + signature
1✔
466
    return vault, 200
1✔
467

468
# GET /role
469
@require_userid
1✔
470
def get_role(user_id, *args, **kwargs):
1✔
471
    user = UserDB().getById(user_id=user_id)
1✔
472
    if not user:
1✔
UNCOV
473
        return {"message" : "User not found"}, 404
×
474
    elif not user.isVerified and conf.features.emails.require_email_validation:
1✔
475
        return {"role" : "not_verified"}, 200
1✔
476
    return {"role": user.role}, 200
1✔
477

478

479
    
480
# GET /google-drive/oauth/authorization_flow
481
def get_authorization_flow():
1✔
482
    if not conf.api.oauth:
1✔
UNCOV
483
        return {"message": "Oauth is disabled on this tenant. Contact the tenant administrator to enable it."}, 403 
×
484
    authorization_url, state = oauth_flow.get_authorization_url()
1✔
485
    flask.session["state"] = state
1✔
486
    logging.info("State stored in session : " + str(state))
1✔
487
    return {"authorization_url": authorization_url, "state":state}, 200
1✔
488

489
# GET /google-drive/oauth/callback
490
@require_valid_user
1✔
491
def oauth_callback(user_id):
1✔
492
    frontend_URI = conf.environment.frontend_URI
1✔
493
    try: 
1✔
494
        credentials = oauth_flow.get_credentials(request.url, flask.session["state"])
1✔
495

496
        if credentials == None:
1✔
497
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+str(flask.session["state"]),  code=302))
1✔
498
            flask.session.pop("state")
1✔
499
            return response
1✔
500

501
        response = make_response(redirect(frontend_URI + "/oauth/callback?status=success&state="+str(flask.session["state"]),    code=302))
1✔
502
        creds_b64 = base64.b64encode(json.dumps(credentials).encode("utf-8")).decode("utf-8")
1✔
503
        sse = ServiceSideEncryption()
1✔
504
        encrypted_cipher = sse.encrypt(creds_b64)
1✔
505
        expires_at = int(datetime.datetime.strptime(credentials["expiry"], "%Y-%m-%d %H:%M:%S.%f").timestamp())
1✔
506
        token_db = Oauth_tokens_db()
1✔
507
        tokens = token_db.get_by_user_id(user_id)
1✔
508
        if tokens:
1✔
509
            tokens = token_db.update(user_id=user_id, enc_credentials=encrypted_cipher["ciphertext"] ,expires_at=expires_at, nonce=encrypted_cipher["nonce"], tag=encrypted_cipher["tag"])
1✔
510
        else:
511
            tokens = token_db.add(user_id=user_id, enc_credentials=encrypted_cipher["ciphertext"], expires_at=expires_at, nonce=encrypted_cipher["nonce"], tag=encrypted_cipher["tag"])
1✔
512
        if tokens:
1✔
513
            google_drive_int = GoogleDriveIntegrationDB()
1✔
514
            integration = google_drive_int.get_by_user_id(user_id)
1✔
515
            if integration == None:
1✔
516
                google_drive_int.create(user_id=user_id, google_drive_sync=1)
1✔
517
            else :
518
                google_drive_int.update_google_drive_sync(user_id=user_id, google_drive_sync=1)
1✔
519
            flask.session.pop("state")
1✔
520
            return response
1✔
521
        else:
522
            logging.warning("Unknown error while storing encrypted tokens for user " + str(user_id))
1✔
523
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+flask.session.get('state'),  code=302))
1✔
524
            flask.session.pop("state")
1✔
525
            return response
1✔
526
    except Exception as e:
1✔
527
        logging.error("Error while exchanging the authorization code " + str(e))
1✔
528
        logging.error(traceback.format_exc())
1✔
529
        if flask.session.get("state"):
1✔
530
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+flask.session.get('state'),  code=302))
1✔
531
            flask.session.pop("state")
1✔
532
        else :
533
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state=none",  code=302))
1✔
534
        return response
1✔
535

536

537

538
#GET /google-drive/option
539
@require_valid_user
1✔
540
def get_google_drive_option(user_id):
1✔
541
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
542
    status = google_drive_integrations.is_google_drive_enabled(user_id)
1✔
543
    if status:
1✔
544
        return {"status": "enabled"}, 200
1✔
545
    else:
546
        return {"status": "disabled"}, 200
1✔
547
    
548
#PUT /google-drive/backup
549
@require_valid_user
1✔
550
def backup_to_google_drive(user_id, *args, **kwargs):
1✔
551
    
552
    token_db = Oauth_tokens_db()
1✔
553
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
554
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
555

556
    if not oauth_tokens or not google_drive_integrations.is_google_drive_enabled(user_id):
1✔
557
        return {"message": "Google drive sync is not enabled"}, 403
1✔
558
    sse = ServiceSideEncryption()
1✔
559
    creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce, tag=oauth_tokens.cipher_tag)
1✔
560
    if creds_b64 == None:
1✔
561
        logging.warning("Error while decrypting credentials for user " + str(user_id))
1✔
562
        return {"message": "Error while decrypting credentials"}, 500
1✔
563
    credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
564
    try:
1✔
565
        exported_vault,_ = export_vault(user=user_id, context_={"user":user_id}, token_info={"user":user_id})
1✔
566
        google_drive_api.backup(credentials=credentials, vault=exported_vault)
1✔
567
        google_drive_api.clean_backup_retention(credentials=credentials, user_id=user_id)
1✔
568
        return {"message": "Backup done"}, 201
1✔
569
    except Exception as e:
1✔
570
        logging.error("Error while backing up to google drive " + str(e))
1✔
571
        return {"message": "Error while backing up to google drive"}, 500
1✔
572

573

574
@require_valid_user
1✔
575
def verify_last_backup(user_id):
1✔
576
    token_db = Oauth_tokens_db()
1✔
577
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
578
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
579
    if not oauth_tokens or not google_drive_integrations.is_google_drive_enabled(user_id):
1✔
580
        return {"message": "Google drive sync is not enabled"}, 403
1✔
581
    sse = ServiceSideEncryption()
1✔
582
    creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce, tag=oauth_tokens.cipher_tag)
1✔
583
    if creds_b64 == None:
1✔
584
        logging.error("Error while decrypting credentials for user " + str(user_id) + ". creds_b64 = " + str(creds_b64))
1✔
585
        return {"error": "Error while connecting to the Google API"}, 500
1✔
586
    
587
    
588
    credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
589
    try:
1✔
590
        last_backup_checksum, last_backup_date = google_drive_api.get_last_backup_checksum(credentials)
1✔
591
    except utils.CorruptedFile as e:
1✔
592
        logging.warning("Error while getting last backup checksum " + str(e))
1✔
593
        return {"status": "corrupted_file"}, 200
1✔
594
    except utils.FileNotFound as e:
1✔
595
        logging.warning("Error while getting last backup checksum " + str(e))
1✔
596
        return {"error": "file_not_found"}, 404
1✔
597
    totp_secrets_list = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id=user_id)
1✔
598
    secrets = utils.get_all_secrets_sorted(totp_secrets_list)
1✔
599
    sha256sum = sha256(json.dumps(secrets,  sort_keys=True).encode("utf-8")).hexdigest()
1✔
600
    if last_backup_checksum == sha256sum:
1✔
601
        google_drive_api.clean_backup_retention(credentials=credentials, user_id=user_id)
1✔
602
        return {"status": "ok", "is_up_to_date": True, "last_backup_date": last_backup_date }, 200
1✔
603
    else:
604
        return {"status": "ok", "is_up_to_date": False, "last_backup_date": "" }, 200
1✔
605

606

607
@require_valid_user
1✔
608
def delete_google_drive_option(user_id):
1✔
609
    google_integration = GoogleDriveIntegrationDB()
1✔
610
    token_db = Oauth_tokens_db()
1✔
611
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
612
    
613
    if google_integration.get_by_user_id(user_id) is None:
1✔
614
        google_integration.create(user_id, 0)
1✔
615
    if not oauth_tokens:
1✔
616
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
617
        return {"message": "Google drive sync is not enabled"}, 200
1✔
618
    sse = ServiceSideEncryption()
1✔
619
    try:
1✔
620
        creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce,  tag=oauth_tokens.cipher_tag)
1✔
621
        if creds_b64 == None:
1✔
622
            token_db.delete(user_id)
1✔
623
            GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
624
            return {"message": "Error while decrypting credentials"}, 200
1✔
625
        credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
626
        google_drive_api.revoke_credentials(credentials)
1✔
627
        token_db.delete(user_id)
1✔
628
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
629
        return {"message": "Google drive sync disabled"}, 200
1✔
630
    except Exception as e:
1✔
631
        logging.error("Error while deleting backup from google drive " + str(e))
1✔
632
        token_db.delete(user_id)
1✔
633
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
634
        return {"message": "Error while revoking credentials"}, 200
1✔
635

636
@require_valid_user
1✔
637
def get_preferences(user_id,fields):
1✔
638
    valid_fields = [ "favicon_policy", "derivation_iteration", "backup_lifetime", "backup_minimum", "autolock_delay"]
1✔
639
    all_field = fields == "all" 
1✔
640
    fields_asked = []
1✔
641
    if not all_field:
1✔
642
        fields = fields.split(",")
1✔
643
        for field in fields:
1✔
644
            if field not in fields_asked:
1✔
645
                for valid_field in valid_fields:
1✔
646
                    if field == valid_field:
1✔
647
                        fields_asked.append(valid_field)
1✔
648

649
        if len(fields_asked) == 0:
1✔
650
            return {"message": "Invalid request"}, 400
1✔
651
    
652
    user_preferences = {}
1✔
653
    preferences_db = PreferencesDB()
1✔
654
    preferences = preferences_db.get_preferences_by_user_id(user_id)
1✔
655
    if "favicon_policy" in fields_asked or all_field:
1✔
656
        user_preferences["favicon_policy"] = preferences.favicon_preview_policy
1✔
657
    if  "derivation_iteration" in fields_asked or all_field:
1✔
658
        user_preferences["derivation_iteration"] = preferences.derivation_iteration
1✔
659
    if "backup_lifetime" in fields_asked or all_field:
1✔
660
        user_preferences["backup_lifetime"] = preferences.backup_lifetime
1✔
661
    if "backup_minimum" in fields_asked or all_field:
1✔
662
        user_preferences["backup_minimum"] = preferences.minimum_backup_kept
1✔
663
    if "autolock_delay" in fields_asked or all_field:
1✔
664
        user_preferences["autolock_delay"] = preferences.vault_autolock_delay_min
1✔
665
    return user_preferences, 200
1✔
666

667

668
@require_valid_user
1✔
669
def set_preference(user_id, body):
1✔
670
    field = body["id"]
1✔
671
    value = body["value"]
1✔
672
    
673
    valid_fields = [ "favicon_policy", "derivation_iteration", "backup_lifetime", "backup_minimum", "autolock_delay"]
1✔
674
    if field not in valid_fields:
1✔
675
        return {"message": "Invalid request"}, 400
1✔
676
    preferences_db = PreferencesDB()
1✔
677
    if field == "favicon_policy":
1✔
678
        if value not in ["always", "never", "enabledOnly"]:
1✔
679
            return {"message": "Invalid request"}, 400
1✔
680
        preferences = preferences_db.update_favicon(user_id, value)
1✔
681
        if preferences:
1✔
682
            return {"message": "Preference updated"}, 201
1✔
683
        else:# pragma: no cover
684
            return {"message": "Unknown error while updating preference"}, 500
685
    elif field == "derivation_iteration":
1✔
686
        try:
1✔
687
            value = int(value)
1✔
688
        except:
1✔
689
            return {"message": "Invalid request"}, 400
1✔
690
        if value < 1000 or value > 1000000:
1✔
691
            return {"message": "iteration must be between 1000 and 1000000 "}, 400
1✔
692
        preferences = preferences_db.update_derivation_iteration(user_id, value)
1✔
693
        if preferences:
1✔
694
            return {"message": "Preference updated"}, 201
1✔
695
        else:# pragma: no cover
696
            return {"message": "Unknown error while updating preference"}, 500
697
    elif field == "backup_lifetime":
1✔
698
        try:
1✔
699
            value = int(value)
1✔
700
        except:
1✔
701
            return {"message": "Invalid request"}, 400
1✔
702
        if value < 1 :
1✔
703
            return {"message": "backup lifetime must be at least day"}, 400
1✔
704
        preferences = preferences_db.update_backup_lifetime(user_id, value)
1✔
705
        if preferences:
1✔
706
            return {"message": "Preference updated"}, 201
1✔
707
        else:# pragma: no cover
708
            return {"message": "Unknown error while updating preference"}, 500
709
    elif field == "backup_minimum":
1✔
710
        try:
1✔
711
            value = int(value)
1✔
712
        except:
1✔
713
            return {"message": "Invalid request"}, 400
1✔
714
        if value < 1 :
1✔
715
            return {"message": "minimum backup kept must be at least of 1"}, 400
1✔
716
        preferences = preferences_db.update_minimum_backup_kept(user_id, value)
1✔
717
        if preferences:
1✔
718
            return {"message": "Preference updated"}, 201
1✔
719
        else:# pragma: no cover
720
            return {"message": "Unknown error while updating preference"}, 500
721
    elif field == "autolock_delay":
1✔
722
        minimum_delay = 1
1✔
723
        maximum_delay = conf.api.refresh_token_validity/60 # autolock is limited by the ability to refresh the token
1✔
724
        try:
1✔
725
            value = int(value)
1✔
726
        except:
1✔
727
            return {"message": "Invalid request"}, 400
1✔
728
        if value < 1 or value > maximum_delay:
1✔
729
            return {"message": "invalid_duration", "minimum_duration_min":minimum_delay, "maximum_duration_min": maximum_delay}, 400
1✔
730
        preferences = preferences_db.update_autolock_delay(user_id, value)
1✔
731
        if preferences:# pragma: no cover
732
            return {"message": "Preference updated"}, 201
UNCOV
733
        return {"message": "Unknown error while updating preference"}, 500
×
734
    else: # pragma: no cover
735
        return {"message": "Invalid request"}, 400
736

737

738
@require_valid_user
1✔
739
def delete_google_drive_backup(user_id):
1✔
740
    google_integration = GoogleDriveIntegrationDB()
1✔
741
    token_db = Oauth_tokens_db()
1✔
742
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
743
    google_drive_option =google_integration.get_by_user_id(user_id) 
1✔
744
    if google_drive_option == None:
1✔
745
        return {"message": "Google drive sync is not enabled"}, 403
1✔
746
    if google_drive_option.isEnabled == 0:
1✔
747
        return {"message": "Google drive sync is not enabled"}, 403
1✔
748
    if not oauth_tokens:
1✔
749
        return {"message": "Google drive sync is not enabled"}, 403
1✔
750
    sse = ServiceSideEncryption()
1✔
751
    try:
1✔
752
        creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce,  tag=oauth_tokens.cipher_tag)
1✔
753
        credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
754
        status = google_drive_api.delete_all_backups(credentials=credentials)
1✔
755
        if status :
1✔
756
            return {"message": "Backups deleted"}, 200
1✔
757
        else:
758
            return {"message": "Error while deleting backups"}, 500
1✔
759
    except Exception as e:
1✔
760
        logging.error("Error while deleting backup from google drive " + str(e))
1✔
761
        token_db.delete(user_id)
1✔
762
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
763
        return {"message": "Error while deleting backups"}, 500
1✔
764
    
765

766
@require_passphrase_verification
1✔
767
def delete_account(user_id):
1✔
768
    logging.info("Deleting account for user " + str(user_id))
1✔
769
    user_obj = UserDB().getById(user_id)
1✔
770
    if user_obj.role == "admin":
1✔
771
        return {"message": "Admin cannot be deleted"}, 403
1✔
772
    try: # we try to delete the user backups if possible. If not, this is not a blocking error.
1✔
773
        context = {"user": user_id}
1✔
774
        delete_google_drive_backup(context, user_id, context)
1✔
775
        delete_google_drive_option(context, user_id, context)
1✔
776
    except Exception as e:
1✔
777
        logging.warning("Error while deleting backups for user " + str(user_id) + ". Exception : " + str(e))
1✔
778
    try:
1✔
779
        utils.delete_user_from_database(user_id)
1✔
780
        return {"message": "Account deleted"}, 200
1✔
781
    except Exception as e:
1✔
782
        logging.warning("Error while deleting user from database for user " + str(user_id) + ". Exception : " + str(e))
1✔
783
        return {"message": "Error while deleting account"}, 500
1✔
784
    
785
    
786

787

788
@require_userid
1✔
789
def send_verification_email(user_id):
1✔
790
    if not conf.features.emails.require_email_validation:
1✔
UNCOV
791
        return {"message": "not implemented"}, 501
×
792
    rate_limiting = Rate_Limiting_DB()
1✔
793
    if(rate_limiting.is_send_verification_email_rate_limited(user_id=user_id)):
1✔
794
            return {"message": "Rate limited",  'ban_time':conf.features.rate_limiting.email_ban_time}, 429
1✔
795
    logging.info("Sending verification email to user " + str(user_id))
1✔
796
    user = UserDB().getById(user_id)
1✔
797
    if user == None:
1✔
UNCOV
798
        return {"message": "User not found"}, 404
×
799
    token = utils.generate_new_email_verification_token(user_id=user_id)
1✔
800
    try:
1✔
801
        send_email.send_verification_email(user.mail, token)
1✔
802
        logging.info("Verification email sent to user " + str(user_id))
1✔
803
        ip = utils.get_ip(request=request)
1✔
804
        rate_limiting.add_send_verification_email(ip=ip, user_id=user_id)
1✔
805
        return {"message": "Verification email sent"}, 200
1✔
806
    except Exception as e:
1✔
807
        logging.error("Error while sending verification email to user " + str(user_id) + ". Exception : " + str(e))
1✔
808
        return {"message": "Error while sending verification email"}, 500
1✔
809

810
@require_userid
1✔
811
def verify_email(user_id,body):
1✔
812
    user = UserDB().getById(user_id)
1✔
813
    if user == None:
1✔
UNCOV
814
        return {"message": "generic_errors.user_not_found"}, 404
×
815
    if user.isVerified:
1✔
816
        return {"message": "email_verif.error.already_verified"}, 200
1✔
817
    token_db = EmailVerificationToken_db()
1✔
818
    token_obj = token_db.get_by_user_id(user_id)
1✔
819
    if token_obj == None:
1✔
820
        return {"message": "email_verif.error.no_active_code"}, 403
1✔
821
    if float(token_obj.expiration) < datetime.datetime.utcnow().timestamp():
1✔
822
        token_db.delete(user_id)
1✔
823
        return {"message": "email_verif.error.expired"}, 403
1✔
824
    if int(token_obj.failed_attempts >= 5):
1✔
825
        logging.warning("User " + str(user_id) + " denied verification because of too many failed attempts.")
1✔
826
        return {"message":  "email_verif.error.too_many_failed"}, 403
1✔
827
    if token_obj.token != body["token"]:
1✔
828
        token_db.increase_fail_attempts(user_id)
1✔
829
        logging.warning("User " + str(user_id) + " tried to verify email with wrong token.")
1✔
830
        return {"message": "email_verif.error.failed", "attempt_left":5-(int(token_obj.failed_attempts))}, 403
1✔
831
    token_db.delete(user_id)
1✔
832
    Rate_Limiting_DB().flush_email_verification_limit(user_id)
1✔
833
    user = UserDB().update_email_verification(user_id, True)
1✔
834
    if user:
1✔
835
        return {"message": "Email verified"}, 200
1✔
836
    else:# pragma: no cover
837
        return {"message": "Error while verifying email"}, 500
838

839

840
@require_valid_user
1✔
841
def get_whoami(user_id):
1✔
842
    user = UserDB().getById(user_id)
1✔
843
    return {"username": user.username, "email": user.mail, "id":user_id}, 200
1✔
844

845

846
def get_global_notification():
1✔
847
    notif = Notifications_db().get_last_active_notification()
1✔
848
    if notif is None : 
1✔
849
        return {"display_notification":False}
1✔
850
    if notif.authenticated_user_only:
1✔
851
        return {"display_notification": True, "authenticated_user_only": True}
1✔
852
    else:
853
        return {
1✔
854
            "display_notification": True, 
855
            "authenticated_user_only": False,
856
            "message":notif.message,
857
            "timestamp":float(notif.timestamp)
858
        }
859
    
860
    
861
@require_valid_user
1✔
862
def get_internal_notification(user_id):
1✔
863
    notif = Notifications_db().get_last_active_notification()
1✔
864
    if notif is None : 
1✔
865
        return {"display_notification":False}
1✔
866
    
867
    return {
1✔
868
            "display_notification": True, 
869
            "authenticated_user_only": False,
870
            "message":notif.message,
871
            "timestamp":float(notif.timestamp)
872
        }
873

874

875
# PUT /auth/refresh
876
@ip_rate_limit
1✔
877
def auth_refresh_token(ip, *args, **kwargs):
1✔
878
    session_token = request.cookies.get("session-token")
1✔
879
    refresh_token = request.cookies.get("refresh-token")
1✔
880
    rate_limiting = Rate_Limiting_DB()
1✔
881
    if not session_token or not refresh_token:
1✔
882
        rate_limiting.add_failed_login(ip)
1✔
883
        return {"message": "Missing token"}, 401
1✔
884
    session = SessionTokenRepo().get_session_token(session_token)
1✔
885
    if not session:
1✔
886
        rate_limiting.add_failed_login(ip)
1✔
887
        return {"message": "Invalid token"}, 401
1✔
888
    if session.revoke_timestamp is not None:
1✔
UNCOV
889
        rate_limiting.add_failed_login(ip)
×
UNCOV
890
        return {"message": "Token revoked"}, 401
×
891
    refresh = RefreshToken_db().get_refresh_token_by_hash(sha256(refresh_token.encode("utf-8")).hexdigest())
1✔
892
    if not refresh:
1✔
893
        rate_limiting.add_failed_login(ip, user_id=session.user_id)
1✔
894
        logging.warning(f"Session of user {session.user_id} tried to be refreshed with an refresh token (not present in the db)")
1✔
895
        return {"message": "Access denied"}, 403
1✔
896
    new_session_token, new_refresh_token = refresh_token_func.refresh_token_flow(refresh=refresh, session=session, ip=ip)
1✔
897
    response = Response(status=200, mimetype="application/json", response=json.dumps({"challenge":"ok"}))
1✔
898
    response.set_auth_cookies(new_session_token, new_refresh_token)
1✔
899
    return response
1✔
900

901

902
def health_check():
1✔
UNCOV
903
    health_status = {}
×
UNCOV
904
    if conf.api.node_check_enabled:
×
UNCOV
905
        health_status["node"] = hmac.new(conf.api.node_name_hmac_secret.encode('utf-8'), conf.api.node_name.encode('utf-8'), hashlib.sha256).hexdigest()
×
906
    
UNCOV
907
    try:
×
UNCOV
908
        db.session.execute(text('SELECT 1'))
×
UNCOV
909
        health_status["database"] = "OK"
×
UNCOV
910
    except Exception as e:
×
UNCOV
911
        logging.warning("Database healthcheck failed : " + str(e))
×
UNCOV
912
        health_status["database"] = "NOT OK"
×
UNCOV
913
    health_status["version"] = conf.api.version
×
UNCOV
914
    health_status["build"] = conf.api.build
×
915

UNCOV
916
    if health_status["database"] == "OK":
×
UNCOV
917
        health_status["health"] = "OK"
×
918
    else:
UNCOV
919
        health_status["health"] = "NOT OK"
×
920
    
UNCOV
921
    return health_status, 200
×
922
    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc