• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SeaweedbrainCY / zero-totp / 16126695263

07 Jul 2025 08:02PM UTC coverage: 92.953% (+0.04%) from 92.918%
16126695263

Pull #270

github

SeaweedbrainCY
Fix test
Pull Request #270: UI improvement

174 of 189 new or added lines in 15 files covered. (92.06%)

139 existing lines in 4 files now uncovered.

12637 of 13595 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.0
api/controllers.py
1
from flask import request, redirect, make_response
1✔
2
from Utils.http_response import Response
1✔
3
import flask
1✔
4
import connexion
1✔
5
import json
1✔
6
from database.user_repo import User as UserDB
1✔
7
from database.zke_repo import ZKE as ZKE_DB
1✔
8
from database.totp_secret_repo import TOTP_secret as TOTP_secretDB
1✔
9
from database.google_drive_integration_repo import GoogleDriveIntegration as GoogleDriveIntegrationDB
1✔
10
from database.preferences_repo import Preferences as PreferencesDB
1✔
11
from database.notif_repo import Notifications as Notifications_db
1✔
12
from database.refresh_token_repo import RefreshTokenRepo as RefreshToken_db
1✔
13
from database.rate_limiting_repo import RateLimitingRepo as Rate_Limiting_DB
1✔
14
from database.session_token_repo import SessionTokenRepo 
1✔
15
from CryptoClasses.hash_func import Bcrypt
1✔
16
from environment import logging, conf
1✔
17
from database.oauth_tokens_repo import Oauth_tokens as Oauth_tokens_db
1✔
18
from CryptoClasses.hash_func import Bcrypt
1✔
19
from Oauth import google_drive_api
1✔
20
import random
1✔
21
import string
1✔
22
from Email import send as send_email
1✔
23
from database.email_verification_repo import EmailVerificationToken as EmailVerificationToken_db
1✔
24
from CryptoClasses.sign_func import API_signature
1✔
25
from CryptoClasses import refresh_token as refresh_token_func
1✔
26
import Oauth.oauth_flow as oauth_flow
1✔
27
import Utils.utils as utils
1✔
28
import os
1✔
29
import base64
1✔
30
import datetime
1✔
31
from Utils.security_wrapper import  require_valid_user, require_passphrase_verification,require_valid_user, require_userid, ip_rate_limit
1✔
32
import traceback
1✔
33
from hashlib import sha256
1✔
34
from CryptoClasses.encryption import ServiceSideEncryption 
1✔
35
from database.db import db
1✔
36
import threading
1✔
37
from uuid import uuid4
1✔
38
import hmac
1✔
39
import hashlib
1✔
40
from sqlalchemy import text
1✔
41
from CryptoClasses.serverRSAKeys import ServerRSAKeys
1✔
42
 
43

44

45

46

47

48

49
if conf.environment.type == "development":
1✔
50
    logging.getLogger().setLevel(logging.INFO)
1✔
51

52

53
# POST /signup
54
def signup():
1✔
55
    if not conf.features.signup_enabled:
1✔
56
        return {"message": "Signup is disabled", "code":"signup_disabled"}, 403
1✔
57
    try:
1✔
58
        data = request.get_json()
1✔
59
        username = utils.sanitize_input(data["username"].strip())
1✔
60
        passphrase = data["password"].strip()
1✔
61
        email = utils.sanitize_input(data["email"].strip())
1✔
62
        derivedKeySalt = utils.sanitize_input(data["derivedKeySalt"].strip())
1✔
63
        zke_key = utils.sanitize_input(data["ZKE_key"].strip())
1✔
64
        passphraseSalt = utils.sanitize_input(data["passphraseSalt"].strip())
1✔
65
    except Exception as e: # pragma: no cover
66
        logging.info(e)
67
        return {"message": "Invalid request"}, 400
68
    
69
    if not username or not passphrase or not email or not derivedKeySalt or not zke_key or not passphraseSalt:
1✔
70
        return {"message": "Missing parameters"}, 400
1✔
71
    if len(username) > 250:
1✔
72
        return {"message": "Username is too long"}, 400
1✔
73
    if not utils.check_email(email) :
1✔
74
        return {"message": "Bad email format"}, 401
1✔
75
    userDB = UserDB()
1✔
76
    user = userDB.getByEmail(email)
1✔
77
    if user:
1✔
UNCOV
78
        return {"message": "User already exists"}, 409
×
79
    check_username = userDB.getByUsername(username)
1✔
80
    if check_username:
1✔
81
        return {"message": "Username already exists"}, 409
1✔
82
    bcrypt = Bcrypt(passphrase)
1✔
83
    try : 
1✔
84
        hashedpw = bcrypt.hashpw()
1✔
85
    except ValueError as e:
1✔
86
        logging.debug(e)
1✔
87
        return {"message": "Password is too long"}, 400
1✔
88
    except Exception as e:
×
UNCOV
89
        logging.warning("Uknown error occured while hashing password" + str(e))
×
UNCOV
90
        return {"message": "Unknown error while hashing your password"}, 500
×
91
    try:
1✔
92
        today = datetime.datetime.now().strftime("%d/%m/%Y")
1✔
93
        user = userDB.create(username=username, email=email, password=hashedpw, randomSalt=derivedKeySalt, isVerified=0,isBlocked=0, passphraseSalt=passphraseSalt, today=today)
1✔
94
    except Exception as e:
×
UNCOV
95
        logging.error("Unknown error while creating user" + str(e))
×
UNCOV
96
        return {"message": "Unknown error while creating user"}, 500
×
97
    if user :
1✔
98
        try:
1✔
99
            zke_db = ZKE_DB()
1✔
100
            zke_key = zke_db.create(user.id, zke_key)
1✔
UNCOV
101
        except Exception as e:
×
UNCOV
102
           zke_key = None
×
103
        if zke_key:
1✔
104
            _,session_token = SessionTokenRepo().generate_session_token(user.id)
1✔
105
            if conf.features.emails.require_email_validation:
1✔
106
                try:
1✔
107
                    send_verification_email(user=user.id, context_={"user":user.id}, token_info={"user":user.id})
1✔
108
                except Exception as e:
1✔
109
                    logging.error("Unknown error while sending verification email" + str(e))
1✔
110
            response = Response(status=201, mimetype="application/json", response=json.dumps({"message": "User created", "email_verification_required":conf.features.emails.require_email_validation}))
1✔
111
            response.set_cookie("session-token", session_token, httponly=True, secure=True, samesite="Lax", max_age=3600)
1✔
112
            return response
1✔
113
        else :
114
            userDB.delete(user.id)
×
UNCOV
115
            logging.error("Unknown error while storing user ZKE keys" + str(username))
×
116
            return {"message": "Unknown error while registering user encrypted keys"}, 500
×
117
    else :
UNCOV
118
        logging.error("Unknown error while creating user" + str(username))
×
UNCOV
119
        return {"message": "Unknown error while creating user"}, 500
×
120

121

122

123
# POST /login
124
@ip_rate_limit
1✔
125
def login(ip, body):
1✔
126
    passphrase = body["password"].strip()
1✔
127
    email = utils.sanitize_input(body["email"]).strip()
1✔
128
    rate_limiting_db = Rate_Limiting_DB()
1✔
129
    if not passphrase or not email:
1✔
130
        return {"message": "generic_errors.missing_params"}, 400
1✔
131
    if(not utils.check_email(email) ):
1✔
132
        return {"message": "generic_errors.bad_email"}, 403
1✔
133
    userDB = UserDB()
1✔
134
    user = userDB.getByEmail(email)
1✔
135
    bcrypt = Bcrypt(passphrase)
1✔
136
    if not user:
1✔
137
        logging.info("User " + str(email) + " tried to login but does not exist. A fake password is checked to avoid timing attacks")
1✔
138
        fakePassword = ''.join(random.choices(string.ascii_letters, k=random.randint(10, 20)))
1✔
139
        bcrypt.checkpw(fakePassword)
1✔
140
        
141
        rate_limiting_db.add_failed_login(ip)
1✔
142
        return {"message": "generic_errors.invalid_creds"}, 403
1✔
143
    logging.info(f"User {user.id} is trying to logging in from gateway {request.remote_addr} and IP {ip}. X-Forwarded-For header is {request.headers.get('X-Forwarded-For')}")
1✔
144
    checked = bcrypt.checkpw(user.password)
1✔
145
    if not checked:
1✔
146
        rate_limiting_db.add_failed_login(ip, user.id)
1✔
147
        return {"message": "generic_errors.invalid_creds"}, 403
1✔
148
    if user.isBlocked: # only authenticated users can see the blocked status
1✔
149
        return {"message": "blocked"}, 403
1✔
150
    
151
    rate_limiting_db.flush_login_limit(ip)
1✔
152
    session_id, session_token = SessionTokenRepo().generate_session_token(user.id)
1✔
153
    refresh_token = refresh_token_func.generate_refresh_token(user.id, session_id)
1✔
154
    if not conf.features.emails.require_email_validation: # we fake the isVerified status if email validation is not required
1✔
UNCOV
155
        response = Response(status=200, mimetype="application/json", response=json.dumps({"username": user.username, "id":user.id, "derivedKeySalt":user.derivedKeySalt, "isGoogleDriveSync": GoogleDriveIntegrationDB().is_google_drive_enabled(user.id), "role":user.role, "isVerified":True}))
×
156
    elif user.isVerified:
1✔
157
        response = Response(status=200, mimetype="application/json", response=json.dumps({"username": user.username, "id":user.id, "derivedKeySalt":user.derivedKeySalt, "isGoogleDriveSync": GoogleDriveIntegrationDB().is_google_drive_enabled(user.id), "role":user.role, "isVerified":user.isVerified}))
1✔
158
    else:
159
        response = Response(status=200, mimetype="application/json", response=json.dumps({"isVerified":user.isVerified}))
1✔
160
    userDB.update_last_login_date(user.id)
1✔
161
    response.set_auth_cookies(session_token, refresh_token)
1✔
162
    return response
1✔
163

164
#POST logout
165
@require_valid_user
1✔
166
def logout(_):
1✔
167
    session_token = request.cookies.get("session-token")
1✔
168
    session_repo = SessionTokenRepo()
1✔
169
    session = session_repo.get_session_token(session_token)
1✔
170
    if not session:
1✔
UNCOV
171
        return {"message": "Session not found"}, 404
×
172
    utils.revoke_session(session_id=session.id)
1✔
173
    response = Response(status=200, mimetype="application/json", response=json.dumps({"message": "Logged out"}))
1✔
174
    response.delete_cookie("session-token")
1✔
175
    response.delete_cookie("refresh-token")
1✔
176
    return response
1✔
177

178

179
#GET /login/specs
180
def get_login_specs(username):
1✔
181
    rate_limiting_db = Rate_Limiting_DB()
1✔
182
    logging.debug("User " + str(username) + " is trying to get login specs")
1✔
183
    ip = utils.get_ip(request)
1✔
184
    if ip:
1✔
185
        if rate_limiting_db.is_login_rate_limited(ip):
1✔
186
            return {"message": "Too many requests", 'ban_time':conf.features.rate_limiting.login_ban_time}, 429
1✔
187
    if(not utils.check_email(username)):
1✔
188
        return {"message": "Bad request"}, 400
1✔
189
    userDB = UserDB()
1✔
190
    user = userDB.getByEmail(username)
1✔
191
    if user :
1✔
192
        return {"passphrase_salt": user.passphraseSalt}, 200
1✔
193
    else :
194
        fake_salt = base64.b64encode(os.urandom(16)).decode("utf-8")
1✔
195
        return {"passphrase_salt": fake_salt}, 200
1✔
196

197
    
198

199
    
200
    
201
#GET /encrypted_secret/{uuid}
202
@require_valid_user
1✔
203
def get_encrypted_secret(user_id, uuid):
1✔
204
    totp_secretDB =  TOTP_secretDB()
1✔
205
    enc_secret = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
206
    if not enc_secret:
1✔
207
        return {"message": "Forbidden"}, 403
1✔
208
    else:
209
        if enc_secret.user_id == user_id:
1✔
210
            return {"enc_secret": enc_secret.secret_enc}, 200
1✔
211
        else :    
UNCOV
212
            logging.warning("User " + str(user_id) + " tried to access secret " + str(uuid) + " which is not his")
×
UNCOV
213
            return {"message": "Forbidden"}, 403
×
214
        
215
#POST /encrypted_secret
216
@require_valid_user
1✔
217
def add_encrypted_secret(user_id, body):
1✔
218
    enc_secret = utils.sanitize_input(body["enc_secret"]).strip()
1✔
219
    secret_uuid = ""
1✔
220
    totp_secretDB =  TOTP_secretDB()
1✔
221
    while secret_uuid == "":
1✔
222
        tmp_uuid = str(uuid4())
1✔
223
        if not totp_secretDB.get_enc_secret_by_uuid(tmp_uuid):
1✔
224
            secret_uuid = tmp_uuid
1✔
225
        
226
    if totp_secretDB.add(user_id, enc_secret, secret_uuid):
1✔
227
        return {"uuid": secret_uuid}, 201
1✔
228
    else :
UNCOV
229
        logging.warning("Unknown error while adding encrypted secret for user " + str(user_id))
×
UNCOV
230
        return {"message": "Unknown error while adding encrypted secret"}, 500
×
231

232
#PUT /encrypted_secret/{uuid}
233
@require_valid_user
1✔
234
def update_encrypted_secret(user_id,uuid, body):
1✔
235
    enc_secret = body["enc_secret"]
1✔
236
    
237
    totp_secretDB =  TOTP_secretDB()
1✔
238
    totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
239
    if not totp:
1✔
240
        logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " which does not exist")
1✔
241
        return {"message": "Forbidden"}, 403
1✔
242
    else:
243
        if totp.user_id != user_id:
1✔
UNCOV
244
            logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " which is not his")
×
UNCOV
245
            return {"message": "Forbidden"}, 403
×
246
        totp = totp_secretDB.update_secret(uuid=uuid, enc_secret=enc_secret, user_id=user_id)
1✔
247
        if totp == None:
1✔
UNCOV
248
                logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " but an error occurred server side while storing your encrypted secret")
×
UNCOV
249
                return {"message": "An error occurred server side while storing your encrypted secret"}, 500
×
250
        else:
251
                return {"message": "Encrypted secret updated"}, 201
1✔
252

253
#DELETE /encrypted_secret/{uuid}
254
@require_valid_user
1✔
255
def delete_encrypted_secret(user_id,uuid):
1✔
256
    if(uuid == ""):
1✔
UNCOV
257
        return {"message": "Invalid request"}, 400
×
258
    totp_secretDB =  TOTP_secretDB()
1✔
259
    totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
260
    if not totp:
1✔
261
        logging.debug("User " + str(user_id) + " tried to delete secret " + str(uuid) + " which does not exist")
1✔
262
        return {"message": "Forbidden"}, 403
1✔
263
    else:
264
        if totp.user_id != user_id:
1✔
UNCOV
265
            logging.warning("User " + str(user_id) + " tried to delete secret " + str(uuid) + " which is not his")
×
UNCOV
266
            return {"message": "Forbidden"}, 403
×
267
        if totp_secretDB.delete(uuid=uuid, user_id= user_id):
1✔
268
            return {"message": "Encrypted secret deleted"}, 201
1✔
269
        else:
UNCOV
270
            logging.warning("Unknown error while deleting encrypted secret for user " + str(user_id) )
×
UNCOV
271
            return {"message": "Unknown error while deleting encrypted secret"}, 500
×
272
        
273

274
#GET /all_secrets
275
@require_valid_user
1✔
276
def get_all_secrets(user_id):
1✔
277
    totp_secretDB =  TOTP_secretDB()
1✔
278
    enc_secrets = totp_secretDB.get_all_enc_secret_by_user_id(user_id)
1✔
279
    if not enc_secrets:
1✔
280
        return {"message": "No secret found"}, 404
1✔
281
    else:
282
        secrets = []
1✔
283
        for enc_secret in enc_secrets:
1✔
284
           secret = {"uuid": enc_secret.uuid, "enc_secret": enc_secret.secret_enc}
1✔
285
           secrets.append(secret)
1✔
286
        return {"enc_secrets": secrets}, 200
1✔
287

288

289
#GET /zke_encrypted_key
290
@require_valid_user
1✔
291
def get_ZKE_encrypted_key(user_id):
1✔
292
        logging.info(user_id)
1✔
293
        zke_db = ZKE_DB()
1✔
294
        zke_key = zke_db.getByUserId(user_id)
1✔
295
        if zke_key:
1✔
296
                return {"zke_encrypted_key": zke_key.ZKE_key}, 200
1✔
297
        else:
UNCOV
298
            return {"message": "No ZKE key found for this user"}, 404
×
299

300

301

302
#PUT /update/email
303
@require_userid
1✔
304
def update_email(user_id,body):
1✔
305
   
306
    email = utils.sanitize_input(body["email"]).strip()
1✔
307
    if not utils.check_email(email):
1✔
308
        return {"message": "This email doesn't have the right format. Check it and try again"}, 400
1✔
309
         
310
    userDb = UserDB()
1✔
311
    already_existing_user = userDb.getByEmail(email)
1✔
312
    if already_existing_user:
1✔
313
        if already_existing_user.id == user_id:
1✔
314
            return {"message":email},201
1✔
315
        else:
316
            return {"message": "This email already exists"}, 403
1✔
317
    old_mail = userDb.getById(user_id).mail
1✔
318
    user = userDb.update_email(user_id=user_id, email=email, isVerified=0)
1✔
319
    if user:
1✔
320
        try:
1✔
321
            ip = utils.get_ip(request)
1✔
322
            thread = threading.Thread(target=utils.send_information_email,args=(ip, old_mail, "Your email address has been updated"))
1✔
323
            thread.start()
1✔
UNCOV
324
        except Exception as e:
×
UNCOV
325
            logging.error("Unknown error while sending information email" + str(e))
×
326
        if conf.features.emails.require_email_validation:
1✔
327
            try:
1✔
328
           
329
                send_verification_email(user=user_id, context_={"user":user_id}, token_info={"user":user_id})
1✔
330
            except Exception as e:
1✔
331
                logging.error("Unknown error while sending verification email" + str(e))
1✔
332
            return {"message":user.mail},201
1✔
333
        else:
334
            return {"message":user.mail},201
×
335
    else :
UNCOV
336
        logging.warning("An error occured while updating email of user " + str(user_id))
×
UNCOV
337
        return {"message": "Unknown error while updating email"}, 500
×
338

339
#PUT /update/username
340
@require_valid_user
1✔
341
def update_username(user_id,body):
1✔
342
    username = utils.sanitize_input(body["username"].strip())
1✔
343
    if not username:
1✔
UNCOV
344
        return {"message": "generic_errors.missing_params"}, 400
×
345
    userDb = UserDB()
1✔
346
    if len(username) > 250:
1✔
347
        return {"message": "Username is too long"}, 400
1✔
348
    already_existing_user = userDb.getByUsername(username)
1✔
349
    if already_existing_user:
1✔
350
        if already_existing_user.id == user_id:
1✔
351
            return {"message":username},201
1✔
352
        else:
353
            return {"message": "generic_errors.username_exists"}, 409
1✔
354
    user = userDb.update_username(user_id=user_id, username=username)
1✔
355
    if user:
1✔
356
        return {"message":user.username},201
1✔
357
    else :
UNCOV
358
        logging.warning("An error occured while updating username of user " + str(user_id))
×
UNCOV
359
        return {"message": "Unknown error while updating username"}, 500
×
360
   
361
#PUT /update/vault 
362
@require_valid_user
1✔
363
def update_vault(user_id, body):
1✔
364
    returnJson = {"message": "Internal server error", "hashing":-1, "totp":-1, "user":-1, "zke":-1}
1✔
365
    try:
1✔
366
        newPassphrase = body["new_passphrase"].strip()
1✔
367
        old_passphrase = body["old_passphrase"].strip()
1✔
368
        enc_vault = body["enc_vault"].strip()
1✔
369
        enc_vault = json.loads(enc_vault)
1✔
370
        zke_key = body["zke_enc"].strip()
1✔
371
        passphrase_salt = body["passphrase_salt"].strip()
1✔
372
        derivedKeySalt = body["derived_key_salt"].strip()
1✔
373
    except Exception as e:
1✔
374
        logging.error(e)
1✔
375
        return '{"message": "Invalid request"}', 400
1✔
376

377
    if not newPassphrase or not old_passphrase or not zke_key or not passphrase_salt or not derivedKeySalt:
1✔
378
        return {"message": "Missing parameters"}, 400
1✔
379
    
380
    
381
    
382
    is_vault_valid, vault_validation_msg = utils.unsafe_json_vault_validation(enc_vault)
1✔
383
    if not is_vault_valid:
1✔
384
        return {"message": vault_validation_msg}, 400
1✔
385
    userDb = UserDB()
1✔
386
    zke_db = ZKE_DB()
1✔
387
    totp_secretDB = TOTP_secretDB()
1✔
388

389
    user = userDb.getById(user_id)
1✔
390
    bcrypt = Bcrypt(old_passphrase)
1✔
391
    if not bcrypt.checkpw(user.password):
1✔
392
        logging.info("User " + str(user_id) + " tried to update his vault but provided passphrase is wrong.")
1✔
393
        return {"message": "Invalid passphrase"}, 403
1✔
394
    bcrypt = Bcrypt(newPassphrase)
1✔
395
    try :
1✔
396
        hashedpw = bcrypt.hashpw()
1✔
397
    except ValueError as e:
1✔
398
        logging.warning(e)
1✔
399
        return {"message": "passphrase too long. It must be <70 char"}, 400
1✔
400
    except Exception as e:
×
401
        logging.warning("Uknown error occured while hashing password" + str(e))
×
UNCOV
402
        returnJson["hashing"]=0
×
UNCOV
403
        return returnJson, 500
×
404
    
405
    old_vault = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id)
1✔
406
    if len(old_vault) != len(enc_vault):
1✔
407
        logging.warning(f"User {user_id} tried to update his vault but the number of secrets in the new vault is different from the old one. The update is rejected.")
1✔
408
        return {"message": "To avoid the loss of your information, Zero-TOTP is rejecting this request because it has detected that you might lose data. Please contact quickly Zero-TOTP developers to fix issue."}, 400
1✔
409
    for secret in old_vault:
1✔
410
        if secret.uuid not in enc_vault:
1✔
411
            logging.warning(f"FORBIDDEN. The user {user_id} tried to update but the secret {secret.uuid} is missing in the new vault. The update is rejected.")
1✔
412
            return {"message": "Forbidden action. Zero-TOTP detected that you were updating object you don't have access to. The request is rejected."}, 403
1✔
413
    
414

415
    returnJson["hashing"]=1
1✔
416
    errors = 0
1✔
417
    for secret in enc_vault.keys():
1✔
418
        totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, secret)
1✔
419
        if not totp:
1✔
420
            totp = totp_secretDB.add(user_id=user_id, enc_secret=enc_vault[secret], uuid=secret)
×
421
            if not totp:
×
UNCOV
422
                logging.warning("Unknown error while adding encrypted secret for user " + str(user_id))
×
UNCOV
423
                errors = 1
×
424
        else:
425
            if totp.user_id != user_id:
1✔
UNCOV
426
                logging.warning("User " + str(user_id) + " tried to update secret " + str(secret) + " which is not his")
×
UNCOV
427
                errors = 1
×
428
            else :
429
                totp = totp_secretDB.update_secret(uuid=secret, enc_secret=enc_vault[secret], user_id=user_id)
1✔
430
                if totp == None:
1✔
UNCOV
431
                    logging.warning("User " + str(user_id) + " tried to update secret " + str(secret) + " but an error occurred server side while storing your  encrypted secret")
×
UNCOV
432
                    errors = 1
×
433
    zke = zke_db.update(user_id, zke_key)
1✔
434
    userUpdated = userDb.update(user_id=user_id, passphrase=hashedpw, passphrase_salt=passphrase_salt, derivedKeySalt=derivedKeySalt)
1✔
435
    returnJson["totp"]=1 if errors == 0 else 0
1✔
436
    returnJson["user"]=1 if userUpdated else 0
1✔
437
    returnJson["zke"]=1 if zke else 0
1✔
438
    if errors == 0 and userUpdated and zke:
1✔
439
        try:
1✔
440
            ip = utils.get_ip(request)
1✔
441
            thread = threading.Thread(target=utils.send_information_email,args=(ip, user.mail, "Your vault passphrase has been updated"))
1✔
442
            thread.start()
1✔
UNCOV
443
        except Exception as e:
×
UNCOV
444
            logging.error("Unknown error while sending information email" + str(e))
×
445
        return {"message": "Vault updated"}, 201
1✔
446
    else:
UNCOV
447
        logging.warning("An error occured while updating passphrase of user " + str(user_id))
×
UNCOV
448
        return returnJson, 500
×
449

450

451
@require_valid_user
1✔
452
def export_vault(user_id):
1✔
453
    
454
    vault = {"version":1, "date": str(datetime.datetime.utcnow())}
1✔
455
    user = UserDB().getById(user_id=user_id)
1✔
456
    zkeKey = ZKE_DB().getByUserId(user_id=user_id)
1✔
457
    totp_secrets_list = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id=user_id)
1✔
458
    if not user or not zkeKey:
1✔
UNCOV
459
        return {"message" : "User not found"}, 404
×
460
    
461
    vault["derived_key_salt"] = user.derivedKeySalt
1✔
462
    vault["zke_key_enc"] = zkeKey.ZKE_key
1✔
463
    secrets = utils.get_all_secrets_sorted(totp_secrets_list)
1✔
464
    vault["secrets"] = secrets
1✔
465
    vault["secrets_sha256sum"] = sha256(json.dumps(vault["secrets"],  sort_keys=True).encode("utf-8")).hexdigest()
1✔
466
    vault_b64 = base64.b64encode(json.dumps(vault).encode("utf-8")).decode("utf-8")
1✔
467
    signature = API_signature().sign_rsa(vault_b64)
1✔
468
    vault = vault_b64 + "," + signature
1✔
469
    return vault, 200
1✔
470

471
# GET /role
472
@require_userid
1✔
473
def get_role(user_id, *args, **kwargs):
1✔
474
    user = UserDB().getById(user_id=user_id)
1✔
475
    if not user:
1✔
UNCOV
476
        return {"message" : "User not found"}, 404
×
477
    elif not user.isVerified and conf.features.emails.require_email_validation:
1✔
478
        return {"role" : "not_verified"}, 200
1✔
479
    return {"role": user.role}, 200
1✔
480

481

482
    
483
# GET /google-drive/oauth/authorization-flow
484
def get_authorization_flow():
1✔
485
    if not conf.features.google_drive.enabled:
1✔
486
        return {"message": "Oauth is disabled on this tenant. Contact the tenant administrator to enable it."}, 403 
1✔
487
    authorization_url, state = oauth_flow.get_authorization_url()
1✔
488
    flask.session["state"] = state
1✔
489
    logging.info("State stored in session : " + str(state))
1✔
490
    return {"authorization_url": authorization_url, "state":state}, 200
1✔
491

492
# GET /google-drive/oauth/callback
493
@require_valid_user
1✔
494
def oauth_callback(user_id):
1✔
495
    if not conf.features.google_drive.enabled:
1✔
UNCOV
496
        return {"message": "Oauth is disabled on this tenant. Contact the tenant administrator to enable it."}, 403
×
497
    frontend_URI = conf.environment.frontend_URI
1✔
498
    try: 
1✔
499
        credentials = oauth_flow.get_credentials(request.url, flask.session["state"])
1✔
500

501
        if credentials == None:
1✔
502
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+str(flask.session["state"]),  code=302))
1✔
503
            flask.session.pop("state")
1✔
504
            return response
1✔
505

506
        response = make_response(redirect(frontend_URI + "/oauth/callback?status=success&state="+str(flask.session["state"]),    code=302))
1✔
507
        creds_b64 = base64.b64encode(json.dumps(credentials).encode("utf-8")).decode("utf-8")
1✔
508
        sse = ServiceSideEncryption()
1✔
509
        encrypted_cipher = sse.encrypt(creds_b64)
1✔
510
        expires_at = int(datetime.datetime.strptime(credentials["expiry"], "%Y-%m-%d %H:%M:%S.%f").timestamp())
1✔
511
        token_db = Oauth_tokens_db()
1✔
512
        tokens = token_db.get_by_user_id(user_id)
1✔
513
        if tokens:
1✔
514
            tokens = token_db.update(user_id=user_id, enc_credentials=encrypted_cipher["ciphertext"] ,expires_at=expires_at, nonce=encrypted_cipher["nonce"], tag=encrypted_cipher["tag"])
1✔
515
        else:
516
            tokens = token_db.add(user_id=user_id, enc_credentials=encrypted_cipher["ciphertext"], expires_at=expires_at, nonce=encrypted_cipher["nonce"], tag=encrypted_cipher["tag"])
1✔
517
        if tokens:
1✔
518
            google_drive_int = GoogleDriveIntegrationDB()
1✔
519
            integration = google_drive_int.get_by_user_id(user_id)
1✔
520
            if integration == None:
1✔
521
                google_drive_int.create(user_id=user_id, google_drive_sync=1)
1✔
522
            else :
523
                google_drive_int.update_google_drive_sync(user_id=user_id, google_drive_sync=1)
1✔
524
            flask.session.pop("state")
1✔
525
            return response
1✔
526
        else:
527
            logging.warning("Unknown error while storing encrypted tokens for user " + str(user_id))
1✔
528
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+flask.session.get('state'),  code=302))
1✔
529
            flask.session.pop("state")
1✔
530
            return response
1✔
531
    except oauth_flow.NoRefreshTokenError as e:
1✔
UNCOV
532
        logging.warning(f"Oauth callback for user {user_id} failed because no refresh token was provided.")
×
UNCOV
533
        return make_response(redirect(frontend_URI + "/oauth/callback?status=refresh-token-error&state=none",  code=302))
×
534
    except Exception as e:
1✔
535
        logging.error("Error while exchanging the authorization code " + str(e))
1✔
536
        logging.error(traceback.format_exc())
1✔
537
        if flask.session.get("state"):
1✔
538
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+flask.session.get('state'),  code=302))
1✔
539
            flask.session.pop("state")
1✔
540
        else :
541
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state=none",  code=302))
1✔
542
        return response
1✔
543

544

545

546
#GET /google-drive/option
547
@require_valid_user
1✔
548
def get_google_drive_option(user_id):
1✔
549
    if not conf.features.google_drive.enabled:
1✔
UNCOV
550
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
×
551
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
552
    status = google_drive_integrations.is_google_drive_enabled(user_id)
1✔
553
    if status:
1✔
554
        return {"status": "enabled"}, 200
1✔
555
    else:
556
        return {"status": "disabled"}, 200
1✔
557
    
558
#PUT /google-drive/backup
559
@require_valid_user
1✔
560
def backup_to_google_drive(user_id, *args, **kwargs):
1✔
561
    if not conf.features.google_drive.enabled:
1✔
562
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
1✔
563
    
564
    token_db = Oauth_tokens_db()
1✔
565
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
566
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
567

568
    if not oauth_tokens or not google_drive_integrations.is_google_drive_enabled(user_id):
1✔
569
        return {"message": "Google drive sync is not enabled"}, 403
1✔
570
    sse = ServiceSideEncryption()
1✔
571
    creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce, tag=oauth_tokens.cipher_tag)
1✔
572
    if creds_b64 == None:
1✔
573
        logging.warning("Error while decrypting credentials for user " + str(user_id))
1✔
574
        return {"message": "Error while decrypting credentials"}, 500
1✔
575
    credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
576
    try:
1✔
577
        exported_vault,_ = export_vault(user=user_id, context_={"user":user_id}, token_info={"user":user_id})
1✔
578
        google_drive_api.backup(credentials=credentials, vault=exported_vault)
1✔
579
        google_drive_api.clean_backup_retention(credentials=credentials, user_id=user_id)
1✔
580
        return {"message": "Backup done"}, 201
1✔
581
    except Exception as e:
1✔
582
        logging.error("Error while backing up to google drive " + str(e))
1✔
583
        return {"message": "Error while backing up to google drive"}, 500
1✔
584

585

586
# GET /google-drive/last-backup/verify
587
@require_valid_user
1✔
588
def verify_last_backup(user_id):
1✔
589
    if not conf.features.google_drive.enabled:
1✔
590
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
1✔
591
    token_db = Oauth_tokens_db()
1✔
592
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
593
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
594
    if not oauth_tokens or not google_drive_integrations.is_google_drive_enabled(user_id):
1✔
595
        return {"message": "Google drive sync is not enabled"}, 403
1✔
596
    sse = ServiceSideEncryption()
1✔
597
    creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce, tag=oauth_tokens.cipher_tag)
1✔
598
    if creds_b64 == None:
1✔
599
        logging.error("Error while decrypting credentials for user " + str(user_id) + ". creds_b64 = " + str(creds_b64))
1✔
600
        return {"error": "Error while connecting to the Google API"}, 500
1✔
601
    
602
    
603
    credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
604
    if credentials.get("refresh_token") is None:
1✔
UNCOV
605
        logging.warning(f"User {user_id} tried to verify last backup but no refresh token was found in the credentials. This is a blocking error.")
×
UNCOV
606
        return {"message": "Error while connecting to the Google API", "error_id": "3c071611-744a-4c93-95c8-c87ee3fce00d"}, 400
×
607
    try:
1✔
608
        last_backup_checksum, last_backup_date = google_drive_api.get_last_backup_checksum(credentials)
1✔
609
    except utils.CorruptedFile as e:
1✔
610
        logging.warning("Error while getting last backup checksum " + str(e))
1✔
611
        return {"status": "corrupted_file"}, 200
1✔
612
    except utils.FileNotFound as e:
1✔
613
        logging.warning("Error while getting last backup checksum " + str(e))
1✔
614
        return {"error": "file_not_found"}, 404
1✔
615
    totp_secrets_list = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id=user_id)
1✔
616
    secrets = utils.get_all_secrets_sorted(totp_secrets_list)
1✔
617
    sha256sum = sha256(json.dumps(secrets,  sort_keys=True).encode("utf-8")).hexdigest()
1✔
618
    if last_backup_checksum == sha256sum:
1✔
619
        google_drive_api.clean_backup_retention(credentials=credentials, user_id=user_id)
1✔
620
        return {"status": "ok", "is_up_to_date": True, "last_backup_date": last_backup_date }, 200
1✔
621
    else:
622
        return {"status": "ok", "is_up_to_date": False, "last_backup_date": "" }, 200
1✔
623

624

625
# DELETE /google-drive/option
626
@require_valid_user
1✔
627
def delete_google_drive_option(user_id):
1✔
628
    if not conf.features.google_drive.enabled:
1✔
UNCOV
629
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
×
630
    google_integration = GoogleDriveIntegrationDB()
1✔
631
    token_db = Oauth_tokens_db()
1✔
632
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
633
    
634
    if google_integration.get_by_user_id(user_id) is None:
1✔
635
        google_integration.create(user_id, 0)
1✔
636
    if not oauth_tokens:
1✔
637
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
638
        return {"message": "Google drive sync is not enabled"}, 200
1✔
639
    sse = ServiceSideEncryption()
1✔
640
    try:
1✔
641
        creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce,  tag=oauth_tokens.cipher_tag)
1✔
642
        if creds_b64 == None:
1✔
643
            token_db.delete(user_id)
1✔
644
            GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
645
            return {"message": "Error while decrypting credentials"}, 200
1✔
646
        credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
647
        google_drive_api.revoke_credentials(credentials)
1✔
648
        token_db.delete(user_id)
1✔
649
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
650
        return {"message": "Google drive sync disabled"}, 200
1✔
651
    except Exception as e:
1✔
652
        logging.error("Error while deleting backup from google drive " + str(e))
1✔
653
        token_db.delete(user_id)
1✔
654
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
655
        return {"message": "Error while revoking credentials"}, 200
1✔
656

657
@require_valid_user
1✔
658
def get_preferences(user_id,fields):
1✔
659
    valid_fields = [ "favicon_policy", "derivation_iteration", "backup_lifetime", "backup_minimum", "autolock_delay"]
1✔
660
    all_field = fields == "all" 
1✔
661
    fields_asked = []
1✔
662
    if not all_field:
1✔
663
        fields = fields.split(",")
1✔
664
        for field in fields:
1✔
665
            if field not in fields_asked:
1✔
666
                for valid_field in valid_fields:
1✔
667
                    if field == valid_field:
1✔
668
                        fields_asked.append(valid_field)
1✔
669

670
        if len(fields_asked) == 0:
1✔
671
            return {"message": "Invalid request"}, 400
1✔
672
    
673
    user_preferences = {}
1✔
674
    preferences_db = PreferencesDB()
1✔
675
    preferences = preferences_db.get_preferences_by_user_id(user_id)
1✔
676
    if "favicon_policy" in fields_asked or all_field:
1✔
677
        user_preferences["favicon_policy"] = preferences.favicon_preview_policy
1✔
678
    if  "derivation_iteration" in fields_asked or all_field:
1✔
679
        user_preferences["derivation_iteration"] = preferences.derivation_iteration
1✔
680
    if "backup_lifetime" in fields_asked or all_field:
1✔
681
        user_preferences["backup_lifetime"] = preferences.backup_lifetime
1✔
682
    if "backup_minimum" in fields_asked or all_field:
1✔
683
        user_preferences["backup_minimum"] = preferences.minimum_backup_kept
1✔
684
    if "autolock_delay" in fields_asked or all_field:
1✔
685
        user_preferences["autolock_delay"] = preferences.vault_autolock_delay_min
1✔
686
    return user_preferences, 200
1✔
687

688

689
@require_valid_user
1✔
690
def set_preference(user_id, body):
1✔
691
    field = body["id"]
1✔
692
    value = body["value"]
1✔
693
    
694
    valid_fields = [ "favicon_policy", "derivation_iteration", "backup_lifetime", "backup_minimum", "autolock_delay"]
1✔
695
    if field not in valid_fields:
1✔
696
        return {"message": "Invalid request"}, 400
1✔
697
    preferences_db = PreferencesDB()
1✔
698
    if field == "favicon_policy":
1✔
699
        if value not in ["always", "never", "enabledOnly"]:
1✔
700
            return {"message": "Invalid request"}, 400
1✔
701
        preferences = preferences_db.update_favicon(user_id, value)
1✔
702
        if preferences:
1✔
703
            return {"message": "Preference updated"}, 201
1✔
704
        else:# pragma: no cover
705
            return {"message": "Unknown error while updating preference"}, 500
706
    elif field == "derivation_iteration":
1✔
707
        try:
1✔
708
            value = int(value)
1✔
709
        except:
1✔
710
            return {"message": "Invalid request"}, 400
1✔
711
        if value < 1000 or value > 1000000:
1✔
712
            return {"message": "iteration must be between 1000 and 1000000 "}, 400
1✔
713
        preferences = preferences_db.update_derivation_iteration(user_id, value)
1✔
714
        if preferences:
1✔
715
            return {"message": "Preference updated"}, 201
1✔
716
        else:# pragma: no cover
717
            return {"message": "Unknown error while updating preference"}, 500
718
    elif field == "backup_lifetime":
1✔
719
        try:
1✔
720
            value = int(value)
1✔
721
        except:
1✔
722
            return {"message": "Invalid request"}, 400
1✔
723
        if value < 1 :
1✔
724
            return {"message": "backup lifetime must be at least day"}, 400
1✔
725
        preferences = preferences_db.update_backup_lifetime(user_id, value)
1✔
726
        if preferences:
1✔
727
            return {"message": "Preference updated"}, 201
1✔
728
        else:# pragma: no cover
729
            return {"message": "Unknown error while updating preference"}, 500
730
    elif field == "backup_minimum":
1✔
731
        try:
1✔
732
            value = int(value)
1✔
733
        except:
1✔
734
            return {"message": "Invalid request"}, 400
1✔
735
        if value < 1 :
1✔
736
            return {"message": "minimum backup kept must be at least of 1"}, 400
1✔
737
        preferences = preferences_db.update_minimum_backup_kept(user_id, value)
1✔
738
        if preferences:
1✔
739
            return {"message": "Preference updated"}, 201
1✔
740
        else:# pragma: no cover
741
            return {"message": "Unknown error while updating preference"}, 500
742
    elif field == "autolock_delay":
1✔
743
        minimum_delay = 1
1✔
744
        maximum_delay = conf.api.refresh_token_validity/60 # autolock is limited by the ability to refresh the token
1✔
745
        try:
1✔
746
            value = int(value)
1✔
747
        except:
1✔
748
            return {"message": "Invalid request"}, 400
1✔
749
        if value < 1 or value > maximum_delay:
1✔
750
            return {"message": "invalid_duration", "minimum_duration_min":minimum_delay, "maximum_duration_min": maximum_delay}, 400
1✔
751
        preferences = preferences_db.update_autolock_delay(user_id, value)
1✔
752
        if preferences:# pragma: no cover
753
            return {"message": "Preference updated"}, 201
UNCOV
754
        return {"message": "Unknown error while updating preference"}, 500
×
755
    else: # pragma: no cover
756
        return {"message": "Invalid request"}, 400
757

758
# DELETE /google-drive/backup
759
@require_valid_user
1✔
760
def delete_google_drive_backup(user_id):
1✔
761
    if not conf.features.google_drive.enabled:
1✔
762
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
1✔
763
    google_integration = GoogleDriveIntegrationDB()
1✔
764
    token_db = Oauth_tokens_db()
1✔
765
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
766
    google_drive_option =google_integration.get_by_user_id(user_id) 
1✔
767
    if google_drive_option == None:
1✔
768
        return {"message": "Google drive sync is not enabled"}, 403
1✔
769
    if google_drive_option.isEnabled == 0:
1✔
770
        return {"message": "Google drive sync is not enabled"}, 403
1✔
771
    if not oauth_tokens:
1✔
772
        return {"message": "Google drive sync is not enabled"}, 403
1✔
773
    sse = ServiceSideEncryption()
1✔
774
    try:
1✔
775
        creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce,  tag=oauth_tokens.cipher_tag)
1✔
776
        credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
777
        status = google_drive_api.delete_all_backups(credentials=credentials)
1✔
778
        if status :
1✔
779
            return {"message": "Backups deleted"}, 200
1✔
780
        else:
781
            return {"message": "Error while deleting backups"}, 500
1✔
782
    except Exception as e:
1✔
783
        logging.error("Error while deleting backup from google drive " + str(e))
1✔
784
        token_db.delete(user_id)
1✔
785
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
786
        return {"message": "Error while deleting backups"}, 500
1✔
787
    
788

789
@require_passphrase_verification
1✔
790
def delete_account(user_id):
1✔
791
    logging.info("Deleting account for user " + str(user_id))
1✔
792
    user_obj = UserDB().getById(user_id)
1✔
793
    if user_obj.role == "admin":
1✔
794
        return {"message": "Admin cannot be deleted"}, 403
1✔
795
    try: # we try to delete the user backups if possible. If not, this is not a blocking error.
1✔
796
        context = {"user": user_id}
1✔
797
        delete_google_drive_backup(context, user_id, context)
1✔
798
        delete_google_drive_option(context, user_id, context)
1✔
799
    except Exception as e:
1✔
800
        logging.warning("Error while deleting backups for user " + str(user_id) + ". Exception : " + str(e))
1✔
801
    try:
1✔
802
        utils.delete_user_from_database(user_id)
1✔
803
        return {"message": "Account deleted"}, 200
1✔
804
    except Exception as e:
1✔
805
        logging.warning("Error while deleting user from database for user " + str(user_id) + ". Exception : " + str(e))
1✔
806
        return {"message": "Error while deleting account"}, 500
1✔
807
    
808
    
809

810

811
@require_userid
1✔
812
def send_verification_email(user_id):
1✔
813
    if not conf.features.emails.require_email_validation:
1✔
UNCOV
814
        return {"message": "not implemented"}, 501
×
815
    rate_limiting = Rate_Limiting_DB()
1✔
816
    if(rate_limiting.is_send_verification_email_rate_limited(user_id=user_id)):
1✔
817
            return {"message": "Rate limited",  'ban_time':conf.features.rate_limiting.email_ban_time}, 429
1✔
818
    logging.info("Sending verification email to user " + str(user_id))
1✔
819
    user = UserDB().getById(user_id)
1✔
820
    if user == None:
1✔
UNCOV
821
        return {"message": "User not found"}, 404
×
822
    token = utils.generate_new_email_verification_token(user_id=user_id)
1✔
823
    try:
1✔
824
        send_email.send_verification_email(user.mail, token)
1✔
825
        logging.info("Verification email sent to user " + str(user_id))
1✔
826
        ip = utils.get_ip(request=request)
1✔
827
        rate_limiting.add_send_verification_email(ip=ip, user_id=user_id)
1✔
828
        return {"message": "Verification email sent"}, 200
1✔
829
    except Exception as e:
1✔
830
        logging.error("Error while sending verification email to user " + str(user_id) + ". Exception : " + str(e))
1✔
831
        return {"message": "Error while sending verification email"}, 500
1✔
832

833
@require_userid
1✔
834
def verify_email(user_id,body):
1✔
835
    user = UserDB().getById(user_id)
1✔
836
    if user == None:
1✔
UNCOV
837
        return {"message": "generic_errors.user_not_found"}, 404
×
838
    if user.isVerified:
1✔
839
        return {"message": "email_verif.error.already_verified"}, 200
1✔
840
    token_db = EmailVerificationToken_db()
1✔
841
    token_obj = token_db.get_by_user_id(user_id)
1✔
842
    if token_obj == None:
1✔
843
        return {"message": "email_verif.error.no_active_code"}, 403
1✔
844
    if datetime.datetime.now(tz=datetime.timezone.utc).timestamp() > float(token_obj.expiration) :
1✔
845
        token_db.delete(user_id)
1✔
846
        return {"message": "email_verif.error.expired"}, 403
1✔
847
    if int(token_obj.failed_attempts >= 5):
1✔
848
        logging.warning("User " + str(user_id) + " denied verification because of too many failed attempts.")
1✔
849
        return {"message":  "email_verif.error.too_many_failed"}, 403
1✔
850
    if token_obj.token != body["token"]:
1✔
851
        token_db.increase_fail_attempts(user_id)
1✔
852
        logging.warning("User " + str(user_id) + " tried to verify email with wrong token.")
1✔
853
        return {"message": "email_verif.error.failed", "attempt_left":5-(int(token_obj.failed_attempts))}, 403
1✔
854
    token_db.delete(user_id)
1✔
855
    Rate_Limiting_DB().flush_email_verification_limit(user_id)
1✔
856
    user = UserDB().update_email_verification(user_id, True)
1✔
857
    if user:
1✔
858
        return {"message": "Email verified"}, 200
1✔
859
    else:# pragma: no cover
860
        return {"message": "Error while verifying email"}, 500
861

862

863
@require_valid_user
1✔
864
def get_whoami(user_id):
1✔
865
    user = UserDB().getById(user_id)
1✔
866
    return {"username": user.username, "email": user.mail, "id":user_id}, 200
1✔
867

868

869
def get_global_notification():
1✔
870
    notif = Notifications_db().get_last_active_notification()
1✔
871
    if notif is None : 
1✔
872
        return {"display_notification":False}
1✔
873
    if notif.authenticated_user_only:
1✔
874
        return {"display_notification": True, "authenticated_user_only": True}
1✔
875
    else:
876
        return {
1✔
877
            "display_notification": True, 
878
            "authenticated_user_only": False,
879
            "message":notif.message,
880
            "timestamp":float(notif.timestamp)
881
        }
882
    
883
    
884
@require_valid_user
1✔
885
def get_internal_notification(user_id):
1✔
886
    notif = Notifications_db().get_last_active_notification()
1✔
887
    if notif is None : 
1✔
888
        return {"display_notification":False}
1✔
889
    
890
    return {
1✔
891
            "display_notification": True, 
892
            "authenticated_user_only": False,
893
            "message":notif.message,
894
            "timestamp":float(notif.timestamp)
895
        }
896

897

898
# PUT /auth/refresh
899
@ip_rate_limit
1✔
900
def auth_refresh_token(ip, *args, **kwargs):
1✔
901
    session_token = request.cookies.get("session-token")
1✔
902
    refresh_token = request.cookies.get("refresh-token")
1✔
903
    rate_limiting = Rate_Limiting_DB()
1✔
904
    if not session_token or not refresh_token:
1✔
905
        rate_limiting.add_failed_login(ip)
1✔
906
        return {"message": "Missing token"}, 401
1✔
907
    session = SessionTokenRepo().get_session_token(session_token)
1✔
908
    if not session:
1✔
909
        rate_limiting.add_failed_login(ip)
1✔
910
        return {"message": "Invalid token"}, 401
1✔
911
    if session.revoke_timestamp is not None:
1✔
912
        rate_limiting.add_failed_login(ip)
×
913
        return {"message": "Token revoked"}, 401
×
914
    refresh = RefreshToken_db().get_refresh_token_by_hash(sha256(refresh_token.encode("utf-8")).hexdigest())
1✔
915
    if not refresh:
1✔
916
        rate_limiting.add_failed_login(ip, user_id=session.user_id)
1✔
917
        logging.warning(f"Session of user {session.user_id} tried to be refreshed with an refresh token (not present in the db)")
1✔
918
        return {"message": "Access denied"}, 403
1✔
919
    new_session_token, new_refresh_token = refresh_token_func.refresh_token_flow(refresh=refresh, session=session, ip=ip)
1✔
920
    response = Response(status=200, mimetype="application/json", response=json.dumps({"challenge":"ok"}))
1✔
921
    response.set_auth_cookies(new_session_token, new_refresh_token)
1✔
922
    return response
1✔
923

924
# GET /healthcheck
925
def health_check():
1✔
UNCOV
926
    health_status = {}
×
UNCOV
927
    global_healthy = True
×
UNCOV
928
    if conf.api.node_check_enabled:
×
UNCOV
929
        health_status["node"] = hmac.new(conf.api.node_name_hmac_secret.encode('utf-8'), conf.api.node_name.encode('utf-8'), hashlib.sha256).hexdigest()
×
930
    
UNCOV
931
    try:
×
UNCOV
932
        db.session.execute(text('SELECT 1'))
×
UNCOV
933
        health_status["database"] = "OK"
×
UNCOV
934
    except Exception as e:
×
UNCOV
935
        logging.warning("Database healthcheck failed : " + str(e))
×
UNCOV
936
        health_status["database"] = "NOT OK"
×
UNCOV
937
        global_healthy = False
×
UNCOV
938
    health_status["version"] = conf.api.version
×
UNCOV
939
    health_status["build"] = conf.api.build
×
940

UNCOV
941
    health_status["health"] = "OK" if global_healthy else "NOT OK"
×
UNCOV
942
    http_status = 200 if global_healthy else 500
×
943
    
UNCOV
944
    return health_status, http_status
×
945
    
946

947
# GET /vault/signature/public-key
948
def get_public_key():
1✔
949
    with open(conf.api.public_key_path, "r") as f:
1✔
950
        public_key = f.read()
1✔
951
    if ServerRSAKeys().validate_rsa_public_key(public_key):
1✔
952
        return {"public_key": public_key}, 200
1✔
953
    else:
954
        logging.error("This is a critical error from get_public_key(). A public key has been requested but the key is not valid. An error as been returned to the user.")
1✔
955
        return {"message": "Invalid"}, 403
1✔
956
    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc