• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SeaweedbrainCY / zero-totp / 18141507884

30 Sep 2025 07:40PM UTC coverage: 92.962% (+0.009%) from 92.953%
18141507884

push

github

SeaweedbrainCY
resolve conflict

70 of 78 new or added lines in 5 files covered. (89.74%)

59 existing lines in 4 files now uncovered.

12759 of 13725 relevant lines covered (92.96%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.03
api/controllers.py
1
from flask import request, redirect, make_response
1✔
2
from Utils.http_response import Response
1✔
3
import flask
1✔
4
import connexion
1✔
5
import json
1✔
6
from database.user_repo import User as UserDB
1✔
7
from database.zke_repo import ZKE as ZKE_DB
1✔
8
from database.totp_secret_repo import TOTP_secret as TOTP_secretDB
1✔
9
from database.google_drive_integration_repo import GoogleDriveIntegration as GoogleDriveIntegrationDB
1✔
10
from database.preferences_repo import Preferences as PreferencesDB
1✔
11
from database.notif_repo import Notifications as Notifications_db
1✔
12
from database.refresh_token_repo import RefreshTokenRepo as RefreshToken_db
1✔
13
from database.rate_limiting_repo import RateLimitingRepo as Rate_Limiting_DB
1✔
14
from database.session_token_repo import SessionTokenRepo 
1✔
15
from CryptoClasses.hash_func import Bcrypt
1✔
16
from environment import logging, conf
1✔
17
from database.oauth_tokens_repo import Oauth_tokens as Oauth_tokens_db
1✔
18
from CryptoClasses.hash_func import Bcrypt
1✔
19
from Oauth import google_drive_api
1✔
20
import random
1✔
21
import string
1✔
22
from Email import send as send_email
1✔
23
from database.email_verification_repo import EmailVerificationToken as EmailVerificationToken_db
1✔
24
from CryptoClasses.sign_func import API_signature
1✔
25
from CryptoClasses import refresh_token as refresh_token_func
1✔
26
import Oauth.oauth_flow as oauth_flow
1✔
27
import Utils.utils as utils
1✔
28
import os
1✔
29
import base64
1✔
30
import datetime
1✔
31
from Utils.security_wrapper import  require_valid_user, require_passphrase_verification,require_valid_user, require_userid, ip_rate_limit
1✔
32
import traceback
1✔
33
from hashlib import sha256
1✔
34
from CryptoClasses.encryption import ServiceSideEncryption 
1✔
35
from database.db import db
1✔
36
import threading
1✔
37
from uuid import uuid4
1✔
38
import hmac
1✔
39
import hashlib
1✔
40
from sqlalchemy import text
1✔
41
from CryptoClasses.serverRSAKeys import ServerRSAKeys
1✔
42
 
43

44

45

46

47

48

49
if conf.environment.type == "development":
1✔
50
    logging.getLogger().setLevel(logging.INFO)
1✔
51

52

53
# POST /signup
54
def signup():
1✔
55
    if not conf.features.signup_enabled:
1✔
56
        return {"message": "Signup is disabled", "code":"signup_disabled"}, 403
1✔
57
    try:
1✔
58
        data = request.get_json()
1✔
59
        username = utils.sanitize_input(data["username"].strip())
1✔
60
        passphrase = data["password"].strip()
1✔
61
        email = utils.sanitize_input(data["email"].strip())
1✔
62
        derivedKeySalt = utils.sanitize_input(data["derivedKeySalt"].strip())
1✔
63
        zke_key = utils.sanitize_input(data["ZKE_key"].strip())
1✔
64
        passphraseSalt = utils.sanitize_input(data["passphraseSalt"].strip())
1✔
65
    except Exception as e: # pragma: no cover
66
        logging.info(e)
67
        return {"message": "Invalid request"}, 400
68
    
69
    if not username or not passphrase or not email or not derivedKeySalt or not zke_key or not passphraseSalt:
1✔
70
        return {"message": "Missing parameters"}, 400
1✔
71
    if len(username) > 250:
1✔
72
        return {"message": "Username is too long"}, 400
1✔
73
    if not utils.check_email(email) :
1✔
74
        return {"message": "Bad email format"}, 401
1✔
75
    userDB = UserDB()
1✔
76
    user = userDB.getByEmail(email)
1✔
77
    if user:
1✔
78
        return {"message": "User already exists"}, 409
×
79
    check_username = userDB.getByUsername(username)
1✔
80
    if check_username:
1✔
81
        return {"message": "Username already exists"}, 409
1✔
82
    bcrypt = Bcrypt(passphrase)
1✔
83
    try : 
1✔
84
        hashedpw = bcrypt.hashpw()
1✔
85
    except ValueError as e:
1✔
86
        logging.debug(e)
1✔
87
        return {"message": "Password is too long"}, 400
1✔
88
    except Exception as e:
×
89
        logging.warning("Uknown error occured while hashing password" + str(e))
×
90
        return {"message": "Unknown error while hashing your password"}, 500
×
91
    try:
1✔
92
        today = datetime.datetime.now().strftime("%d/%m/%Y")
1✔
93
        user = userDB.create(username=username, email=email, password=hashedpw, randomSalt=derivedKeySalt, isVerified=0,isBlocked=0, passphraseSalt=passphraseSalt, today=today)
1✔
94
    except Exception as e:
×
95
        logging.error("Unknown error while creating user" + str(e))
×
96
        return {"message": "Unknown error while creating user"}, 500
×
97
    if user :
1✔
98
        try:
1✔
99
            zke_db = ZKE_DB()
1✔
100
            zke_key = zke_db.create(user.id, zke_key)
1✔
101
        except Exception as e:
×
102
           zke_key = None
×
103
        if zke_key:
1✔
104
            _,session_token = SessionTokenRepo().generate_session_token(user.id)
1✔
105
            if conf.features.emails.require_email_validation:
1✔
106
                try:
1✔
107
                    send_verification_email(user=user.id, context_={"user":user.id}, token_info={"user":user.id})
1✔
108
                except Exception as e:
1✔
109
                    logging.error("Unknown error while sending verification email" + str(e))
1✔
110
            response = Response(status=201, mimetype="application/json", response=json.dumps({"message": "User created", "email_verification_required":conf.features.emails.require_email_validation}))
1✔
111
            response.set_cookie("session-token", session_token, httponly=True, secure=True, samesite="Lax", max_age=3600)
1✔
112
            return response
1✔
113
        else :
114
            userDB.delete(user.id)
×
115
            logging.error("Unknown error while storing user ZKE keys" + str(username))
×
116
            return {"message": "Unknown error while registering user encrypted keys"}, 500
×
117
    else :
118
        logging.error("Unknown error while creating user" + str(username))
×
119
        return {"message": "Unknown error while creating user"}, 500
×
120

121

122

123
# POST /login
124
@ip_rate_limit
1✔
125
def login(ip, body):
1✔
126
    passphrase = body["password"].strip()
1✔
127
    email = utils.sanitize_input(body["email"]).strip()
1✔
128
    rate_limiting_db = Rate_Limiting_DB()
1✔
129
    if not passphrase or not email:
1✔
130
        return {"message": "generic_errors.missing_params"}, 400
1✔
131
    if(not utils.check_email(email) ):
1✔
132
        return {"message": "generic_errors.bad_email"}, 403
1✔
133
    userDB = UserDB()
1✔
134
    user = userDB.getByEmail(email)
1✔
135
    bcrypt = Bcrypt(passphrase)
1✔
136
    if not user:
1✔
137
        logging.info("User " + str(email) + " tried to login but does not exist. A fake password is checked to avoid timing attacks")
1✔
138
        fakePassword = ''.join(random.choices(string.ascii_letters, k=random.randint(10, 20)))
1✔
139
        bcrypt.checkpw(fakePassword)
1✔
140
        
141
        rate_limiting_db.add_failed_login(ip)
1✔
142
        return {"message": "generic_errors.invalid_creds"}, 403
1✔
143
    logging.info(f"User {user.id} is trying to logging in from gateway {request.remote_addr} and IP {ip}. X-Forwarded-For header is {request.headers.get('X-Forwarded-For')}")
1✔
144
    checked = bcrypt.checkpw(user.password)
1✔
145
    if not checked:
1✔
146
        rate_limiting_db.add_failed_login(ip, user.id)
1✔
147
        return {"message": "generic_errors.invalid_creds"}, 403
1✔
148
    if user.isBlocked: # only authenticated users can see the blocked status
1✔
149
        return {"message": "blocked"}, 403
1✔
150
    
151
    rate_limiting_db.flush_login_limit(ip)
1✔
152
    session_id, session_token = SessionTokenRepo().generate_session_token(user.id)
1✔
153
    refresh_token = refresh_token_func.generate_refresh_token(user.id, session_id)
1✔
154
    if not conf.features.emails.require_email_validation: # we fake the isVerified status if email validation is not required
1✔
155
        response = Response(status=200, mimetype="application/json", response=json.dumps({"username": user.username, "id":user.id, "derivedKeySalt":user.derivedKeySalt, "isGoogleDriveSync": GoogleDriveIntegrationDB().is_google_drive_enabled(user.id), "role":user.role, "isVerified":True}))
×
156
    elif user.isVerified:
1✔
157
        response = Response(status=200, mimetype="application/json", response=json.dumps({"username": user.username, "id":user.id, "derivedKeySalt":user.derivedKeySalt, "isGoogleDriveSync": GoogleDriveIntegrationDB().is_google_drive_enabled(user.id), "role":user.role, "isVerified":user.isVerified}))
1✔
158
    else:
159
        response = Response(status=200, mimetype="application/json", response=json.dumps({"isVerified":user.isVerified}))
1✔
160
    userDB.update_last_login_date(user.id)
1✔
161
    response.set_auth_cookies(session_token, refresh_token)
1✔
162
    return response
1✔
163

164
#POST logout
165
@require_valid_user
1✔
166
def logout(_):
1✔
167
    session_token = request.cookies.get("session-token")
1✔
168
    session_repo = SessionTokenRepo()
1✔
169
    session = session_repo.get_session_token(session_token)
1✔
170
    if not session:
1✔
171
        return {"message": "Session not found"}, 404
×
172
    utils.revoke_session(session_id=session.id)
1✔
173
    response = Response(status=200, mimetype="application/json", response=json.dumps({"message": "Logged out"}))
1✔
174
    response.delete_cookie("session-token")
1✔
175
    response.delete_cookie("refresh-token")
1✔
176
    return response
1✔
177

178

179
#GET /login/specs
180
def get_login_specs(username):
1✔
181
    rate_limiting_db = Rate_Limiting_DB()
1✔
182
    logging.debug("User " + str(username) + " is trying to get login specs")
1✔
183
    ip = utils.get_ip(request)
1✔
184
    if ip:
1✔
185
        if rate_limiting_db.is_login_rate_limited(ip):
1✔
186
            return {"message": "Too many requests", 'ban_time':conf.features.rate_limiting.login_ban_time}, 429
1✔
187
    if(not utils.check_email(username)):
1✔
188
        return {"message": "Bad request"}, 400
1✔
189
    userDB = UserDB()
1✔
190
    user = userDB.getByEmail(username)
1✔
191
    if user :
1✔
192
        return {"passphrase_salt": user.passphraseSalt}, 200
1✔
193
    else :
194
        fake_salt = base64.b64encode(os.urandom(16)).decode("utf-8")
1✔
195
        return {"passphrase_salt": fake_salt}, 200
1✔
196

197
    
198

199
    
200
    
201
#GET /encrypted_secret/{uuid}
202
@require_valid_user
1✔
203
def get_encrypted_secret(user_id, uuid):
1✔
204
    totp_secretDB =  TOTP_secretDB()
1✔
205
    enc_secret = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
206
    if not enc_secret:
1✔
207
        return {"message": "Forbidden"}, 403
1✔
208
    else:
209
        if enc_secret.user_id == user_id:
1✔
210
            return {"enc_secret": enc_secret.secret_enc}, 200
1✔
211
        else :    
212
            logging.warning("User " + str(user_id) + " tried to access secret " + str(uuid) + " which is not his")
×
213
            return {"message": "Forbidden"}, 403
×
214
        
215
#POST /encrypted_secret
216
@require_valid_user
1✔
217
def add_encrypted_secret(user_id, body):
1✔
218
    enc_secret = utils.sanitize_input(body["enc_secret"]).strip()
1✔
219
    secret_uuid = ""
1✔
220
    totp_secretDB =  TOTP_secretDB()
1✔
221
    while secret_uuid == "":
1✔
222
        tmp_uuid = str(uuid4())
1✔
223
        if not totp_secretDB.get_enc_secret_by_uuid(tmp_uuid):
1✔
224
            secret_uuid = tmp_uuid
1✔
225
        
226
    if totp_secretDB.add(user_id, enc_secret, secret_uuid):
1✔
227
        return {"uuid": secret_uuid}, 201
1✔
228
    else :
229
        logging.warning("Unknown error while adding encrypted secret for user " + str(user_id))
×
230
        return {"message": "Unknown error while adding encrypted secret"}, 500
×
231

232
#PUT /encrypted_secret/{uuid}
233
@require_valid_user
1✔
234
def update_encrypted_secret(user_id,uuid, body):
1✔
235
    enc_secret = body["enc_secret"]
1✔
236
    
237
    totp_secretDB =  TOTP_secretDB()
1✔
238
    totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
239
    if not totp:
1✔
240
        logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " which does not exist")
1✔
241
        return {"message": "Forbidden"}, 403
1✔
242
    else:
243
        if totp.user_id != user_id:
1✔
244
            logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " which is not his")
×
245
            return {"message": "Forbidden"}, 403
×
246
        totp = totp_secretDB.update_secret(uuid=uuid, enc_secret=enc_secret, user_id=user_id)
1✔
247
        if totp == None:
1✔
248
                logging.warning("User " + str(user_id) + " tried to update secret " + str(uuid) + " but an error occurred server side while storing your encrypted secret")
×
249
                return {"message": "An error occurred server side while storing your encrypted secret"}, 500
×
250
        else:
251
                return {"message": "Encrypted secret updated"}, 201
1✔
252

253
#DELETE /encrypted_secret/{uuid}
254
@require_valid_user
1✔
255
def delete_encrypted_secret(user_id,uuid):
1✔
256
    if(uuid == ""):
1✔
257
        return {"message": "Invalid request"}, 400
×
258
    totp_secretDB =  TOTP_secretDB()
1✔
259
    totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, uuid)
1✔
260
    if not totp:
1✔
261
        logging.debug("User " + str(user_id) + " tried to delete secret " + str(uuid) + " which does not exist")
1✔
262
        return {"message": "Forbidden"}, 403
1✔
263
    else:
264
        if totp.user_id != user_id:
1✔
265
            logging.warning("User " + str(user_id) + " tried to delete secret " + str(uuid) + " which is not his")
×
266
            return {"message": "Forbidden"}, 403
×
267
        if totp_secretDB.delete(uuid=uuid, user_id= user_id):
1✔
268
            return {"message": "Encrypted secret deleted"}, 201
1✔
269
        else:
270
            logging.warning("Unknown error while deleting encrypted secret for user " + str(user_id) )
×
271
            return {"message": "Unknown error while deleting encrypted secret"}, 500
×
272
        
273

274
#GET /all_secrets
275
@require_valid_user
1✔
276
def get_all_secrets(user_id):
1✔
277
    totp_secretDB =  TOTP_secretDB()
1✔
278
    enc_secrets = totp_secretDB.get_all_enc_secret_by_user_id(user_id)
1✔
279
    if not enc_secrets:
1✔
280
        return {"message": "No secret found"}, 404
1✔
281
    else:
282
        secrets = []
1✔
283
        for enc_secret in enc_secrets:
1✔
284
           secret = {"uuid": enc_secret.uuid, "enc_secret": enc_secret.secret_enc}
1✔
285
           secrets.append(secret)
1✔
286
        return {"enc_secrets": secrets}, 200
1✔
287

288

289
#GET /zke_encrypted_key
290
@require_valid_user
1✔
291
def get_ZKE_encrypted_key(user_id):
1✔
292
        logging.info(user_id)
1✔
293
        zke_db = ZKE_DB()
1✔
294
        zke_key = zke_db.getByUserId(user_id)
1✔
295
        if zke_key:
1✔
296
                return {"zke_encrypted_key": zke_key.ZKE_key}, 200
1✔
297
        else:
298
            return {"message": "No ZKE key found for this user"}, 404
×
299

300

301

302
#PUT /update/email
303
@require_userid
1✔
304
def update_email(user_id,body):
1✔
305
   
306
    email = utils.sanitize_input(body["email"]).strip()
1✔
307
    if not utils.check_email(email):
1✔
308
        return {"message": "This email doesn't have the right format. Check it and try again"}, 400
1✔
309
         
310
    userDb = UserDB()
1✔
311
    already_existing_user = userDb.getByEmail(email)
1✔
312
    if already_existing_user:
1✔
313
        if already_existing_user.id == user_id:
1✔
314
            return {"message":email},201
1✔
315
        else:
316
            return {"message": "This email already exists"}, 403
1✔
317
    old_mail = userDb.getById(user_id).mail
1✔
318
    user = userDb.update_email(user_id=user_id, email=email, isVerified=0)
1✔
319
    if user:
1✔
320
        try:
1✔
321
            ip = utils.get_ip(request)
1✔
322
            thread = threading.Thread(target=utils.send_information_email,args=(ip, old_mail, "Your email address has been updated"))
1✔
323
            thread.start()
1✔
324
        except Exception as e:
×
325
            logging.error("Unknown error while sending information email" + str(e))
×
326
        if conf.features.emails.require_email_validation:
1✔
327
            try:
1✔
328
           
329
                send_verification_email(user=user_id, context_={"user":user_id}, token_info={"user":user_id})
1✔
330
            except Exception as e:
1✔
331
                logging.error("Unknown error while sending verification email" + str(e))
1✔
332
            return {"message":user.mail},201
1✔
333
        else:
334
            return {"message":user.mail},201
×
335
    else :
336
        logging.warning("An error occured while updating email of user " + str(user_id))
×
337
        return {"message": "Unknown error while updating email"}, 500
×
338

339
#PUT /update/username
340
@require_valid_user
1✔
341
def update_username(user_id,body):
1✔
342
    username = utils.sanitize_input(body["username"].strip())
1✔
343
    if not username:
1✔
344
        return {"message": "generic_errors.missing_params"}, 400
×
345
    userDb = UserDB()
1✔
346
    if len(username) > 250:
1✔
347
        return {"message": "Username is too long"}, 400
1✔
348
    already_existing_user = userDb.getByUsername(username)
1✔
349
    if already_existing_user:
1✔
350
        if already_existing_user.id == user_id:
1✔
351
            return {"message":username},201
1✔
352
        else:
353
            return {"message": "generic_errors.username_exists"}, 409
1✔
354
    user = userDb.update_username(user_id=user_id, username=username)
1✔
355
    if user:
1✔
356
        return {"message":user.username},201
1✔
357
    else :
358
        logging.warning("An error occured while updating username of user " + str(user_id))
×
359
        return {"message": "Unknown error while updating username"}, 500
×
360
   
361
#PUT /update/vault 
362
@require_valid_user
1✔
363
def update_vault(user_id, body):
1✔
364
    returnJson = {"message": "Internal server error", "hashing":-1, "totp":-1, "user":-1, "zke":-1}
1✔
365
    try:
1✔
366
        newPassphrase = body["new_passphrase"].strip()
1✔
367
        old_passphrase = body["old_passphrase"].strip()
1✔
368
        enc_vault = body["enc_vault"].strip()
1✔
369
        enc_vault = json.loads(enc_vault)
1✔
370
        zke_key = body["zke_enc"].strip()
1✔
371
        passphrase_salt = body["passphrase_salt"].strip()
1✔
372
        derivedKeySalt = body["derived_key_salt"].strip()
1✔
373
    except Exception as e:
1✔
374
        logging.error(e)
1✔
375
        return '{"message": "Invalid request"}', 400
1✔
376

377
    if not newPassphrase or not old_passphrase or not zke_key or not passphrase_salt or not derivedKeySalt:
1✔
378
        return {"message": "Missing parameters"}, 400
1✔
379
    
380
    
381
    
382
    is_vault_valid, vault_validation_msg = utils.unsafe_json_vault_validation(enc_vault)
1✔
383
    if not is_vault_valid:
1✔
384
        return {"message": vault_validation_msg}, 400
1✔
385
    userDb = UserDB()
1✔
386
    zke_db = ZKE_DB()
1✔
387
    totp_secretDB = TOTP_secretDB()
1✔
388

389
    user = userDb.getById(user_id)
1✔
390
    bcrypt = Bcrypt(old_passphrase)
1✔
391
    if not bcrypt.checkpw(user.password):
1✔
392
        logging.info("User " + str(user_id) + " tried to update his vault but provided passphrase is wrong.")
1✔
393
        return {"message": "Invalid passphrase"}, 403
1✔
394
    bcrypt = Bcrypt(newPassphrase)
1✔
395
    try :
1✔
396
        hashedpw = bcrypt.hashpw()
1✔
397
    except ValueError as e:
1✔
398
        logging.warning(e)
1✔
399
        return {"message": "passphrase too long. It must be <70 char"}, 400
1✔
400
    except Exception as e:
×
401
        logging.warning("Uknown error occured while hashing password" + str(e))
×
402
        returnJson["hashing"]=0
×
403
        return returnJson, 500
×
404

405
    logging.info(f"User {user_id} is updating their passphrase. Old passphrase is verified. Proceeding with vault update. IP : {utils.get_ip(request)}")
1✔
406
    
407
    old_vault = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id)
1✔
408
    if len(old_vault) != len(enc_vault):
1✔
409
        logging.warning(f"User {user_id} tried to update his vault but the number of secrets in the new vault is different from the old one. The update is rejected.")
1✔
410
        return {"message": "To avoid the loss of your information, Zero-TOTP is rejecting this request because it has detected that you might lose data. Please contact quickly Zero-TOTP developers to fix issue."}, 400
1✔
411
    for secret in old_vault:
1✔
412
        if secret.uuid not in enc_vault:
1✔
413
            logging.warning(f"FORBIDDEN. The user {user_id} tried to update but the secret {secret.uuid} is missing in the new vault. The update is rejected.")
1✔
414
            return {"message": "Forbidden action. Zero-TOTP detected that you were updating object you don't have access to. The request is rejected."}, 403
1✔
415
    
416

417
    returnJson["hashing"]=1
1✔
418
    errors = 0
1✔
419
    for secret in enc_vault.keys():
1✔
420
        totp = totp_secretDB.get_enc_secret_of_user_by_uuid(user_id, secret)
1✔
421
        if not totp:
1✔
422
            totp = totp_secretDB.add(user_id=user_id, enc_secret=enc_vault[secret], uuid=secret)
×
423
            if not totp:
×
UNCOV
424
                logging.warning("Unknown error while adding encrypted secret for user " + str(user_id))
×
UNCOV
425
                errors = 1
×
426
        else:
427
            if totp.user_id != user_id:
1✔
UNCOV
428
                logging.warning("User " + str(user_id) + " tried to update secret " + str(secret) + " which is not his")
×
UNCOV
429
                errors = 1
×
430
            else :
431
                totp = totp_secretDB.update_secret(uuid=secret, enc_secret=enc_vault[secret], user_id=user_id)
1✔
432
                if totp == None:
1✔
UNCOV
433
                    logging.warning("User " + str(user_id) + " tried to update secret " + str(secret) + " but an error occurred server side while storing your  encrypted secret")
×
UNCOV
434
                    errors = 1
×
435
    zke = zke_db.update(user_id, zke_key)
1✔
436
    userUpdated = userDb.update(user_id=user_id, passphrase=hashedpw, passphrase_salt=passphrase_salt, derivedKeySalt=derivedKeySalt)
1✔
437
    returnJson["totp"]=1 if errors == 0 else 0
1✔
438
    returnJson["user"]=1 if userUpdated else 0
1✔
439
    returnJson["zke"]=1 if zke else 0
1✔
440
    if errors == 0 and userUpdated and zke:
1✔
441
        try:
1✔
442
            ip = utils.get_ip(request)
1✔
443
            thread = threading.Thread(target=utils.send_information_email,args=(ip, user.mail, "Your vault passphrase has been updated"))
1✔
444
            thread.start()
1✔
UNCOV
445
        except Exception as e:
×
UNCOV
446
            logging.error("Unknown error while sending information email" + str(e))
×
447
        logging.info(f"User {user_id} has successfully updated their vault and passphrase.")
1✔
448
        return {"message": "Vault updated"}, 201
1✔
449
    else:
UNCOV
450
        logging.warning("An error occured while updating passphrase of user " + str(user_id))
×
UNCOV
451
        return returnJson, 500
×
452

453

454
@require_valid_user
1✔
455
def export_vault(user_id):
1✔
456
    
457
    vault = {"version":1, "date": str(datetime.datetime.utcnow())}
1✔
458
    user = UserDB().getById(user_id=user_id)
1✔
459
    zkeKey = ZKE_DB().getByUserId(user_id=user_id)
1✔
460
    totp_secrets_list = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id=user_id)
1✔
461
    if not user or not zkeKey:
1✔
UNCOV
462
        return {"message" : "User not found"}, 404
×
463
    
464
    vault["derived_key_salt"] = user.derivedKeySalt
1✔
465
    vault["zke_key_enc"] = zkeKey.ZKE_key
1✔
466
    secrets = utils.get_all_secrets_sorted(totp_secrets_list)
1✔
467
    vault["secrets"] = secrets
1✔
468
    vault["secrets_sha256sum"] = sha256(json.dumps(vault["secrets"],  sort_keys=True).encode("utf-8")).hexdigest()
1✔
469
    vault_b64 = base64.b64encode(json.dumps(vault).encode("utf-8")).decode("utf-8")
1✔
470
    signature = API_signature().sign_rsa(vault_b64)
1✔
471
    vault = vault_b64 + "," + signature
1✔
472
    return vault, 200
1✔
473

474
# GET /role
475
@require_userid
1✔
476
def get_role(user_id, *args, **kwargs):
1✔
477
    user = UserDB().getById(user_id=user_id)
1✔
478
    if not user:
1✔
UNCOV
479
        return {"message" : "User not found"}, 404
×
480
    elif not user.isVerified and conf.features.emails.require_email_validation:
1✔
481
        return {"role" : "not_verified"}, 200
1✔
482
    return {"role": user.role}, 200
1✔
483

484

485
    
486
# GET /google-drive/oauth/authorization-flow
487
def get_authorization_flow():
1✔
488
    if not conf.features.google_drive.enabled:
1✔
489
        return {"message": "Oauth is disabled on this tenant. Contact the tenant administrator to enable it."}, 403 
1✔
490
    authorization_url, state = oauth_flow.get_authorization_url()
1✔
491
    flask.session["state"] = state
1✔
492
    logging.info("State stored in session : " + str(state))
1✔
493
    return {"authorization_url": authorization_url, "state":state}, 200
1✔
494

495
# GET /google-drive/oauth/callback
496
@require_valid_user
1✔
497
def oauth_callback(user_id):
1✔
498
    if not conf.features.google_drive.enabled:
1✔
UNCOV
499
        return {"message": "Oauth is disabled on this tenant. Contact the tenant administrator to enable it."}, 403
×
500
    frontend_URI = conf.environment.frontend_URI
1✔
501
    try: 
1✔
502
        credentials = oauth_flow.get_credentials(request.url, flask.session["state"])
1✔
503

504
        if credentials == None:
1✔
505
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+str(flask.session["state"]),  code=302))
1✔
506
            flask.session.pop("state")
1✔
507
            return response
1✔
508

509
        response = make_response(redirect(frontend_URI + "/oauth/callback?status=success&state="+str(flask.session["state"]),    code=302))
1✔
510
        creds_b64 = base64.b64encode(json.dumps(credentials).encode("utf-8")).decode("utf-8")
1✔
511
        sse = ServiceSideEncryption()
1✔
512
        encrypted_cipher = sse.encrypt(creds_b64)
1✔
513
        expires_at = int(datetime.datetime.strptime(credentials["expiry"], "%Y-%m-%d %H:%M:%S.%f").timestamp())
1✔
514
        token_db = Oauth_tokens_db()
1✔
515
        tokens = token_db.get_by_user_id(user_id)
1✔
516
        if tokens:
1✔
517
            tokens = token_db.update(user_id=user_id, enc_credentials=encrypted_cipher["ciphertext"] ,expires_at=expires_at, nonce=encrypted_cipher["nonce"], tag=encrypted_cipher["tag"])
1✔
518
        else:
519
            tokens = token_db.add(user_id=user_id, enc_credentials=encrypted_cipher["ciphertext"], expires_at=expires_at, nonce=encrypted_cipher["nonce"], tag=encrypted_cipher["tag"])
1✔
520
        if tokens:
1✔
521
            google_drive_int = GoogleDriveIntegrationDB()
1✔
522
            integration = google_drive_int.get_by_user_id(user_id)
1✔
523
            if integration == None:
1✔
524
                google_drive_int.create(user_id=user_id, google_drive_sync=1)
1✔
525
            else :
526
                google_drive_int.update_google_drive_sync(user_id=user_id, google_drive_sync=1)
1✔
527
            flask.session.pop("state")
1✔
528
            return response
1✔
529
        else:
530
            logging.warning("Unknown error while storing encrypted tokens for user " + str(user_id))
1✔
531
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+flask.session.get('state'),  code=302))
1✔
532
            flask.session.pop("state")
1✔
533
            return response
1✔
534
    except oauth_flow.NoRefreshTokenError as e:
1✔
UNCOV
535
        logging.warning(f"Oauth callback for user {user_id} failed because no refresh token was provided.")
×
UNCOV
536
        return make_response(redirect(frontend_URI + "/oauth/callback?status=refresh-token-error&state=none",  code=302))
×
537
    except Exception as e:
1✔
538
        logging.error("Error while exchanging the authorization code " + str(e))
1✔
539
        logging.error(traceback.format_exc())
1✔
540
        if flask.session.get("state"):
1✔
541
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state="+flask.session.get('state'),  code=302))
1✔
542
            flask.session.pop("state")
1✔
543
        else :
544
            response = make_response(redirect(frontend_URI + "/oauth/callback?status=error&state=none",  code=302))
1✔
545
        return response
1✔
546

547

548

549
#GET /google-drive/option
550
@require_valid_user
1✔
551
def get_google_drive_option(user_id):
1✔
552
    if not conf.features.google_drive.enabled:
1✔
UNCOV
553
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
×
554
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
555
    status = google_drive_integrations.is_google_drive_enabled(user_id)
1✔
556
    if status:
1✔
557
        return {"status": "enabled"}, 200
1✔
558
    else:
559
        return {"status": "disabled"}, 200
1✔
560
    
561
#PUT /google-drive/backup
562
@require_valid_user
1✔
563
def backup_to_google_drive(user_id, *args, **kwargs):
1✔
564
    if not conf.features.google_drive.enabled:
1✔
565
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
1✔
566
    
567
    token_db = Oauth_tokens_db()
1✔
568
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
569
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
570

571
    if not oauth_tokens or not google_drive_integrations.is_google_drive_enabled(user_id):
1✔
572
        return {"message": "Google drive sync is not enabled"}, 403
1✔
573
    sse = ServiceSideEncryption()
1✔
574
    creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce, tag=oauth_tokens.cipher_tag)
1✔
575
    if creds_b64 == None:
1✔
576
        logging.warning("Error while decrypting credentials for user " + str(user_id))
1✔
577
        return {"message": "Error while decrypting credentials"}, 500
1✔
578
    credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
579
    try:
1✔
580
        exported_vault,_ = export_vault(user=user_id, context_={"user":user_id}, token_info={"user":user_id})
1✔
581
        google_drive_api.backup(credentials=credentials, vault=exported_vault)
1✔
582
        google_drive_api.clean_backup_retention(credentials=credentials, user_id=user_id)
1✔
583
        return {"message": "Backup done"}, 201
1✔
584
    except Exception as e:
1✔
585
        logging.error("Error while backing up to google drive " + str(e))
1✔
586
        return {"message": "Error while backing up to google drive"}, 500
1✔
587

588

589
# GET /google-drive/last-backup/verify
590
@require_valid_user
1✔
591
def verify_last_backup(user_id):
1✔
592
    if not conf.features.google_drive.enabled:
1✔
593
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
1✔
594
    token_db = Oauth_tokens_db()
1✔
595
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
596
    google_drive_integrations = GoogleDriveIntegrationDB()
1✔
597
    if not oauth_tokens or not google_drive_integrations.is_google_drive_enabled(user_id):
1✔
598
        return {"message": "Google drive sync is not enabled"}, 403
1✔
599
    sse = ServiceSideEncryption()
1✔
600
    creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce, tag=oauth_tokens.cipher_tag)
1✔
601
    if creds_b64 == None:
1✔
602
        logging.error("Error while decrypting credentials for user " + str(user_id) + ". creds_b64 = " + str(creds_b64))
1✔
603
        return {"error": "Error while connecting to the Google API"}, 500
1✔
604
    
605
    
606
    credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
607
    if credentials.get("refresh_token") is None:
1✔
UNCOV
608
        logging.warning(f"User {user_id} tried to verify last backup but no refresh token was found in the credentials. This is a blocking error.")
×
UNCOV
609
        return {"message": "Error while connecting to the Google API", "error_id": "3c071611-744a-4c93-95c8-c87ee3fce00d"}, 400
×
610
    try:
1✔
611
        last_backup_checksum, last_backup_date = google_drive_api.get_last_backup_checksum(credentials)
1✔
612
    except utils.CorruptedFile as e:
1✔
613
        logging.warning("Error while getting last backup checksum " + str(e))
1✔
614
        return {"status": "corrupted_file"}, 200
1✔
615
    except utils.FileNotFound as e:
1✔
616
        logging.warning("Error while getting last backup checksum " + str(e))
1✔
617
        return {"error": "file_not_found"}, 404
1✔
618
    totp_secrets_list = TOTP_secretDB().get_all_enc_secret_by_user_id(user_id=user_id)
1✔
619
    secrets = utils.get_all_secrets_sorted(totp_secrets_list)
1✔
620
    sha256sum = sha256(json.dumps(secrets,  sort_keys=True).encode("utf-8")).hexdigest()
1✔
621
    if last_backup_checksum == sha256sum:
1✔
622
        google_drive_api.clean_backup_retention(credentials=credentials, user_id=user_id)
1✔
623
        return {"status": "ok", "is_up_to_date": True, "last_backup_date": last_backup_date }, 200
1✔
624
    else:
625
        return {"status": "ok", "is_up_to_date": False, "last_backup_date": "" }, 200
1✔
626

627

628
# DELETE /google-drive/option
629
@require_valid_user
1✔
630
def delete_google_drive_option(user_id):
1✔
631
    if not conf.features.google_drive.enabled:
1✔
UNCOV
632
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
×
633
    google_integration = GoogleDriveIntegrationDB()
1✔
634
    token_db = Oauth_tokens_db()
1✔
635
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
636
    
637
    if google_integration.get_by_user_id(user_id) is None:
1✔
638
        google_integration.create(user_id, 0)
1✔
639
    if not oauth_tokens:
1✔
640
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
641
        return {"message": "Google drive sync is not enabled"}, 200
1✔
642
    sse = ServiceSideEncryption()
1✔
643
    try:
1✔
644
        creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce,  tag=oauth_tokens.cipher_tag)
1✔
645
        if creds_b64 == None:
1✔
646
            token_db.delete(user_id)
1✔
647
            GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
648
            return {"message": "Error while decrypting credentials"}, 200
1✔
649
        credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
650
        google_drive_api.revoke_credentials(credentials)
1✔
651
        token_db.delete(user_id)
1✔
652
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
653
        return {"message": "Google drive sync disabled"}, 200
1✔
654
    except Exception as e:
1✔
655
        logging.error("Error while deleting backup from google drive " + str(e))
1✔
656
        token_db.delete(user_id)
1✔
657
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
658
        return {"message": "Error while revoking credentials"}, 200
1✔
659

660
@require_valid_user
1✔
661
def get_preferences(user_id,fields):
1✔
662
    valid_fields = [ "favicon_policy", "derivation_iteration", "backup_lifetime", "backup_minimum", "autolock_delay"]
1✔
663
    all_field = fields == "all" 
1✔
664
    fields_asked = []
1✔
665
    if not all_field:
1✔
666
        fields = fields.split(",")
1✔
667
        for field in fields:
1✔
668
            if field not in fields_asked:
1✔
669
                for valid_field in valid_fields:
1✔
670
                    if field == valid_field:
1✔
671
                        fields_asked.append(valid_field)
1✔
672

673
        if len(fields_asked) == 0:
1✔
674
            return {"message": "Invalid request"}, 400
1✔
675
    
676
    user_preferences = {}
1✔
677
    preferences_db = PreferencesDB()
1✔
678
    preferences = preferences_db.get_preferences_by_user_id(user_id)
1✔
679
    if "favicon_policy" in fields_asked or all_field:
1✔
680
        user_preferences["favicon_policy"] = preferences.favicon_preview_policy
1✔
681
    if  "derivation_iteration" in fields_asked or all_field:
1✔
682
        user_preferences["derivation_iteration"] = preferences.derivation_iteration
1✔
683
    if "backup_lifetime" in fields_asked or all_field:
1✔
684
        user_preferences["backup_lifetime"] = preferences.backup_lifetime
1✔
685
    if "backup_minimum" in fields_asked or all_field:
1✔
686
        user_preferences["backup_minimum"] = preferences.minimum_backup_kept
1✔
687
    if "autolock_delay" in fields_asked or all_field:
1✔
688
        user_preferences["autolock_delay"] = preferences.vault_autolock_delay_min
1✔
689
    return user_preferences, 200
1✔
690

691

692
@require_valid_user
1✔
693
def set_preference(user_id, body):
1✔
694
    field = body["id"]
1✔
695
    value = body["value"]
1✔
696
    
697
    valid_fields = [ "favicon_policy", "derivation_iteration", "backup_lifetime", "backup_minimum", "autolock_delay"]
1✔
698
    if field not in valid_fields:
1✔
699
        return {"message": "Invalid request"}, 400
1✔
700
    preferences_db = PreferencesDB()
1✔
701
    if field == "favicon_policy":
1✔
702
        if value not in ["always", "never", "enabledOnly"]:
1✔
703
            return {"message": "Invalid request"}, 400
1✔
704
        preferences = preferences_db.update_favicon(user_id, value)
1✔
705
        if preferences:
1✔
706
            return {"message": "Preference updated"}, 201
1✔
707
        else:# pragma: no cover
708
            return {"message": "Unknown error while updating preference"}, 500
709
    elif field == "derivation_iteration":
1✔
710
        try:
1✔
711
            value = int(value)
1✔
712
        except:
1✔
713
            return {"message": "Invalid request"}, 400
1✔
714
        if value < 1000 or value > 1000000:
1✔
715
            return {"message": "iteration must be between 1000 and 1000000 "}, 400
1✔
716
        preferences = preferences_db.update_derivation_iteration(user_id, value)
1✔
717
        if preferences:
1✔
718
            return {"message": "Preference updated"}, 201
1✔
719
        else:# pragma: no cover
720
            return {"message": "Unknown error while updating preference"}, 500
721
    elif field == "backup_lifetime":
1✔
722
        try:
1✔
723
            value = int(value)
1✔
724
        except:
1✔
725
            return {"message": "Invalid request"}, 400
1✔
726
        if value < 1 :
1✔
727
            return {"message": "backup lifetime must be at least day"}, 400
1✔
728
        preferences = preferences_db.update_backup_lifetime(user_id, value)
1✔
729
        if preferences:
1✔
730
            return {"message": "Preference updated"}, 201
1✔
731
        else:# pragma: no cover
732
            return {"message": "Unknown error while updating preference"}, 500
733
    elif field == "backup_minimum":
1✔
734
        try:
1✔
735
            value = int(value)
1✔
736
        except:
1✔
737
            return {"message": "Invalid request"}, 400
1✔
738
        if value < 1 :
1✔
739
            return {"message": "minimum backup kept must be at least of 1"}, 400
1✔
740
        preferences = preferences_db.update_minimum_backup_kept(user_id, value)
1✔
741
        if preferences:
1✔
742
            return {"message": "Preference updated"}, 201
1✔
743
        else:# pragma: no cover
744
            return {"message": "Unknown error while updating preference"}, 500
745
    elif field == "autolock_delay":
1✔
746
        minimum_delay = 1
1✔
747
        maximum_delay = conf.api.refresh_token_validity/60 # autolock is limited by the ability to refresh the token
1✔
748
        try:
1✔
749
            value = int(value)
1✔
750
        except:
1✔
751
            return {"message": "Invalid request"}, 400
1✔
752
        if value < 1 or value > maximum_delay:
1✔
753
            return {"message": "invalid_duration", "minimum_duration_min":minimum_delay, "maximum_duration_min": maximum_delay}, 400
1✔
754
        preferences = preferences_db.update_autolock_delay(user_id, value)
1✔
755
        if preferences:# pragma: no cover
756
            return {"message": "Preference updated"}, 201
UNCOV
757
        return {"message": "Unknown error while updating preference"}, 500
×
758
    else: # pragma: no cover
759
        return {"message": "Invalid request"}, 400
760

761
# DELETE /google-drive/backup
762
@require_valid_user
1✔
763
def delete_google_drive_backup(user_id):
1✔
764
    if not conf.features.google_drive.enabled:
1✔
765
        return {"message": "Google drive sync is not enabled on this tenant."}, 403
1✔
766
    google_integration = GoogleDriveIntegrationDB()
1✔
767
    token_db = Oauth_tokens_db()
1✔
768
    oauth_tokens = token_db.get_by_user_id(user_id)
1✔
769
    google_drive_option =google_integration.get_by_user_id(user_id) 
1✔
770
    if google_drive_option == None:
1✔
771
        return {"message": "Google drive sync is not enabled"}, 403
1✔
772
    if google_drive_option.isEnabled == 0:
1✔
773
        return {"message": "Google drive sync is not enabled"}, 403
1✔
774
    if not oauth_tokens:
1✔
775
        return {"message": "Google drive sync is not enabled"}, 403
1✔
776
    sse = ServiceSideEncryption()
1✔
777
    try:
1✔
778
        creds_b64 = sse.decrypt( ciphertext=oauth_tokens.enc_credentials, nonce=oauth_tokens.cipher_nonce,  tag=oauth_tokens.cipher_tag)
1✔
779
        credentials = json.loads(base64.b64decode(creds_b64).decode("utf-8"))
1✔
780
        status = google_drive_api.delete_all_backups(credentials=credentials)
1✔
781
        if status :
1✔
782
            return {"message": "Backups deleted"}, 200
1✔
783
        else:
784
            return {"message": "Error while deleting backups"}, 500
1✔
785
    except Exception as e:
1✔
786
        logging.error("Error while deleting backup from google drive " + str(e))
1✔
787
        token_db.delete(user_id)
1✔
788
        GoogleDriveIntegrationDB().update_google_drive_sync(user_id, 0)
1✔
789
        return {"message": "Error while deleting backups"}, 500
1✔
790
    
791

792
@require_passphrase_verification
1✔
793
def delete_account(user_id):
1✔
794
    logging.info("Deleting account for user " + str(user_id))
1✔
795
    user_obj = UserDB().getById(user_id)
1✔
796
    if user_obj.role == "admin":
1✔
797
        return {"message": "Admin cannot be deleted"}, 403
1✔
798
    try: # we try to delete the user backups if possible. If not, this is not a blocking error.
1✔
799
        context = {"user": user_id}
1✔
800
        delete_google_drive_backup(context, user_id, context)
1✔
801
        delete_google_drive_option(context, user_id, context)
1✔
802
    except Exception as e:
1✔
803
        logging.warning("Error while deleting backups for user " + str(user_id) + ". Exception : " + str(e))
1✔
804
    try:
1✔
805
        utils.delete_user_from_database(user_id)
1✔
806
        return {"message": "Account deleted"}, 200
1✔
807
    except Exception as e:
1✔
808
        logging.warning("Error while deleting user from database for user " + str(user_id) + ". Exception : " + str(e))
1✔
809
        return {"message": "Error while deleting account"}, 500
1✔
810
    
811
    
812

813

814
@require_userid
1✔
815
def send_verification_email(user_id):
1✔
816
    if not conf.features.emails.require_email_validation:
1✔
UNCOV
817
        return {"message": "not implemented"}, 501
×
818
    rate_limiting = Rate_Limiting_DB()
1✔
819
    if(rate_limiting.is_send_verification_email_rate_limited(user_id=user_id)):
1✔
820
            return {"message": "Rate limited",  'ban_time':conf.features.rate_limiting.email_ban_time}, 429
1✔
821
    logging.info("Sending verification email to user " + str(user_id))
1✔
822
    user = UserDB().getById(user_id)
1✔
823
    if user == None:
1✔
UNCOV
824
        return {"message": "User not found"}, 404
×
825
    token = utils.generate_new_email_verification_token(user_id=user_id)
1✔
826
    try:
1✔
827
        send_email.send_verification_email(user.mail, token)
1✔
828
        logging.info("Verification email sent to user " + str(user_id))
1✔
829
        ip = utils.get_ip(request=request)
1✔
830
        rate_limiting.add_send_verification_email(ip=ip, user_id=user_id)
1✔
831
        return {"message": "Verification email sent"}, 200
1✔
832
    except Exception as e:
1✔
833
        logging.error("Error while sending verification email to user " + str(user_id) + ". Exception : " + str(e))
1✔
834
        return {"message": "Error while sending verification email"}, 500
1✔
835

836
@require_userid
1✔
837
def verify_email(user_id,body):
1✔
838
    user = UserDB().getById(user_id)
1✔
839
    if user == None:
1✔
UNCOV
840
        return {"message": "generic_errors.user_not_found"}, 404
×
841
    if user.isVerified:
1✔
842
        return {"message": "email_verif.error.already_verified"}, 200
1✔
843
    token_db = EmailVerificationToken_db()
1✔
844
    token_obj = token_db.get_by_user_id(user_id)
1✔
845
    if token_obj == None:
1✔
846
        return {"message": "email_verif.error.no_active_code"}, 403
1✔
847
    if datetime.datetime.now(tz=datetime.timezone.utc).timestamp() > float(token_obj.expiration) :
1✔
848
        token_db.delete(user_id)
1✔
849
        return {"message": "email_verif.error.expired"}, 403
1✔
850
    if int(token_obj.failed_attempts >= 5):
1✔
851
        logging.warning("User " + str(user_id) + " denied verification because of too many failed attempts.")
1✔
852
        return {"message":  "email_verif.error.too_many_failed"}, 403
1✔
853
    if token_obj.token != body["token"]:
1✔
854
        token_db.increase_fail_attempts(user_id)
1✔
855
        logging.warning("User " + str(user_id) + " tried to verify email with wrong token.")
1✔
856
        return {"message": "email_verif.error.failed", "attempt_left":5-(int(token_obj.failed_attempts))}, 403
1✔
857
    token_db.delete(user_id)
1✔
858
    Rate_Limiting_DB().flush_email_verification_limit(user_id)
1✔
859
    user = UserDB().update_email_verification(user_id, True)
1✔
860
    if user:
1✔
861
        return {"message": "Email verified"}, 200
1✔
862
    else:# pragma: no cover
863
        return {"message": "Error while verifying email"}, 500
864

865

866
@require_valid_user
1✔
867
def get_whoami(user_id):
1✔
868
    user = UserDB().getById(user_id)
1✔
869
    return {"username": user.username, "email": user.mail, "id":user_id}, 200
1✔
870

871

872
def get_global_notification():
1✔
873
    notif = Notifications_db().get_last_active_notification()
1✔
874
    if notif is None : 
1✔
875
        return {"display_notification":False}
1✔
876
    if notif.authenticated_user_only:
1✔
877
        return {"display_notification": True, "authenticated_user_only": True}
1✔
878
    else:
879
        return {
1✔
880
            "display_notification": True, 
881
            "authenticated_user_only": False,
882
            "message":notif.message,
883
            "timestamp":float(notif.timestamp)
884
        }
885
    
886
    
887
@require_valid_user
1✔
888
def get_internal_notification(user_id):
1✔
889
    notif = Notifications_db().get_last_active_notification()
1✔
890
    if notif is None : 
1✔
891
        return {"display_notification":False}
1✔
892
    
893
    return {
1✔
894
            "display_notification": True, 
895
            "authenticated_user_only": False,
896
            "message":notif.message,
897
            "timestamp":float(notif.timestamp)
898
        }
899

900

901
# PUT /auth/refresh
902
@ip_rate_limit
1✔
903
def auth_refresh_token(ip, *args, **kwargs):
1✔
904
    session_token = request.cookies.get("session-token")
1✔
905
    refresh_token = request.cookies.get("refresh-token")
1✔
906
    rate_limiting = Rate_Limiting_DB()
1✔
907
    if not session_token or not refresh_token:
1✔
908
        rate_limiting.add_failed_login(ip)
1✔
909
        return {"message": "Missing token"}, 401
1✔
910
    session = SessionTokenRepo().get_session_token(session_token)
1✔
911
    if not session:
1✔
912
        rate_limiting.add_failed_login(ip)
1✔
913
        return {"message": "Invalid token"}, 401
1✔
914
    if session.revoke_timestamp is not None:
1✔
UNCOV
915
        rate_limiting.add_failed_login(ip)
×
UNCOV
916
        return {"message": "Token revoked"}, 401
×
917
    refresh = RefreshToken_db().get_refresh_token_by_hash(sha256(refresh_token.encode("utf-8")).hexdigest())
1✔
918
    if not refresh:
1✔
919
        rate_limiting.add_failed_login(ip, user_id=session.user_id)
1✔
920
        logging.warning(f"Session of user {session.user_id} tried to be refreshed with an refresh token (not present in the db)")
1✔
921
        return {"message": "Access denied"}, 403
1✔
922
    new_session_token, new_refresh_token = refresh_token_func.refresh_token_flow(refresh=refresh, session=session, ip=ip)
1✔
923
    response = Response(status=200, mimetype="application/json", response=json.dumps({"challenge":"ok"}))
1✔
924
    response.set_auth_cookies(new_session_token, new_refresh_token)
1✔
925
    return response
1✔
926

927
# GET /healthcheck
928
def health_check():
1✔
929
    health_status = {}
×
UNCOV
930
    global_healthy = True
×
931
    if conf.api.node_check_enabled:
×
932
        health_status["node"] = hmac.new(conf.api.node_name_hmac_secret.encode('utf-8'), conf.api.node_name.encode('utf-8'), hashlib.sha256).hexdigest()
×
933
    
934
    try:
×
935
        db.session.execute(text('SELECT 1'))
×
936
        health_status["database"] = "OK"
×
937
    except Exception as e:
×
938
        logging.warning("Database healthcheck failed : " + str(e))
×
939
        health_status["database"] = "NOT OK"
×
UNCOV
940
        global_healthy = False
×
941
    health_status["version"] = conf.api.version
×
942
    health_status["build"] = conf.api.build
×
943

944
    health_status["health"] = "OK" if global_healthy else "NOT OK"
×
UNCOV
945
    http_status = 200 if global_healthy else 500
×
946
    
UNCOV
947
    return health_status, http_status
×
948
    
949

950
# GET /vault/signature/public-key
951
def get_public_key():
1✔
952
    with open(conf.api.public_key_path, "r") as f:
1✔
953
        public_key = f.read()
1✔
954
    if ServerRSAKeys().validate_rsa_public_key(public_key):
1✔
955
        return {"public_key": public_key}, 200
1✔
956
    else:
957
        logging.error("This is a critical error from get_public_key(). A public key has been requested but the key is not valid. An error as been returned to the user.")
1✔
958
        return {"message": "Invalid"}, 403
1✔
959
    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc