• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SeaweedbrainCY / zero-totp / 18141507884

30 Sep 2025 07:40PM UTC coverage: 92.962% (+0.009%) from 92.953%
18141507884

push

github

SeaweedbrainCY
resolve conflict

70 of 78 new or added lines in 5 files covered. (89.74%)

59 existing lines in 4 files now uncovered.

12759 of 13725 relevant lines covered (92.96%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.45
api/Utils/utils.py
1
from app import app
1✔
2
import re
1✔
3
import html
1✔
4
from environment import logging
1✔
5
import datetime
1✔
6
from database.oauth_tokens_repo import Oauth_tokens as Oauth_tokens_repo
1✔
7
from database.user_repo import User as User_repo
1✔
8
from database.totp_secret_repo import TOTP_secret as TOTP_secret_repo
1✔
9
from database.zke_repo import ZKE as ZKE_encryption_key_repo
1✔
10
from database.google_drive_integration_repo import GoogleDriveIntegration as GoogleDriveIntegration_repo
1✔
11
from database.preferences_repo import Preferences as Preferences_repo
1✔
12
from database.email_verification_repo import EmailVerificationToken
1✔
13
from database.rate_limiting_repo import RateLimitingRepo 
1✔
14
from database.refresh_token_repo import RefreshTokenRepo
1✔
15
from database.session_token_repo import SessionTokenRepo
1✔
16
from database.backup_configuration_repo import BackupConfigurationRepo
1✔
17
import os
1✔
18
from hashlib import sha256
1✔
19
from base64 import b64encode
1✔
20
import requests
1✔
21
from Email import send as send_email
1✔
22
import ipaddress
1✔
23
from jsonschema import validate, ValidationError
1✔
24
from environment import conf
1✔
25
import geoip2.database
1✔
26

27

28
class FileNotFound(Exception):
1✔
29
    pass
1✔
30

31
class CorruptedFile(Exception):
1✔
32
     pass
1✔
33

34
def check_email(email):
1✔
35
    email_regex = r"(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"
1✔
36
    if len(email) > 250:
1✔
37
        return False
1✔
38
    return re.match(email_regex, email) and re.search(email_regex, email).group() == email
1✔
39

40

41
def sanitize_input(string) -> str:
1✔
42
    string = string.replace("'", "")
1✔
43
    string = string.replace('"', "")
1✔
44
    return html.escape(string)
1✔
45

46
def get_all_secrets_sorted(totp_secrets_list):
1✔
47
    secrets = []
1✔
48
    for secret in totp_secrets_list:
1✔
49
        secrets.append({"uuid": secret.uuid, "enc_secret": secret.secret_enc})
1✔
50
    return sorted(secrets,key=lambda x: x["uuid"])
1✔
51

52
def extract_last_backup_from_list(files_list) -> (any, datetime):
1✔
53
    last_backup_file_date = None
1✔
54
    last_backup_file = None
1✔
55
    for file in files_list:
1✔
56
        logging.debug("name =" +file.get("name"))
1✔
57
        if "_backup" not in file.get("name") or file.get('explicitlyTrashed'):
1✔
58
            continue
1✔
59
        date_str = file.get("name").split("_")[0]
1✔
60
        try:
1✔
61
          date = datetime.datetime.strptime(date_str, '%d-%m-%Y-%H-%M-%S')
1✔
62
        except Exception as e:
1✔
63
              logging.info("Error while parsing date : " + str(e) + " (file name : " + file.get("name") + ". Ignoring this file")
1✔
64
              continue
1✔
65
        if last_backup_file_date is None:
1✔
66
            last_backup_file_date = date
1✔
67
            last_backup_file = file
1✔
68
        elif date > last_backup_file_date:
1✔
69
              last_backup_file_date = date
1✔
70
              last_backup_file = file
1✔
71
    if last_backup_file is None:
1✔
72
        logging.info("No backup file found in the drive (last_backup_file is None)")
1✔
73
        raise FileNotFound("No backup file found")
1✔
74
    return last_backup_file,last_backup_file_date
1✔
75

76
 
77
def delete_user_from_database(user_id):
1✔
78
    Oauth_tokens_repo().delete(user_id)
1✔
79
    GoogleDriveIntegration_repo().delete(user_id)
1✔
80
    Preferences_repo().delete(user_id)
1✔
81
    TOTP_secret_repo().delete_all(user_id)
1✔
82
    BackupConfigurationRepo().delete(user_id)
1✔
83
    ZKE_encryption_key_repo().delete(user_id)
1✔
84
    EmailVerificationToken().delete(user_id)
1✔
85
    RateLimitingRepo().flush_by_user_id(user_id)
1✔
86
    SessionTokenRepo().delete_by_user_id(user_id)
1✔
87
    RefreshTokenRepo().delete_by_user_id(user_id)
1✔
88
    User_repo().delete(user_id)
1✔
89
    logging.info("User " + str(user_id) + " deleted from database")
1✔
90

91

92
def generate_new_email_verification_token(user_id):
1✔
93
    email_verification_token_repo = EmailVerificationToken()
1✔
94
    email_verification_token_repo.delete(user_id)
1✔
95
    token = os.urandom(5).hex()
1✔
96
    expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=30)
1✔
97
    email_verification_token_repo.add(user_id, token,expiration.timestamp())
1✔
98
    return token
1✔
99

100
def send_information_email(ip, email, reason):
1✔
101
    logging.info(str(reason)+ str(ip) + str(email))
1✔
102
    date = str(datetime.datetime.utcnow().strftime("%d/%m/%Y %H:%M:%S")) + " UTC"
1✔
103
    
104
    ip_and_geo = get_geolocation(ip)
1✔
105
    try:
1✔
106
        send_email.send_information_email(email, reason=reason, date=date, ip=ip_and_geo)
1✔
107
    except Exception as e:
1✔
108
        logging.error("Unknown error while sending information email" + str(e))
1✔
109

110
# Return format : ip (city region country)
111
# If the geolocation is disabled  the function will return only the IP address
112
# If the IP is private, the function will return an empty string to avoid leaking private IPs
113
def get_geolocation(ip):
1✔
114
    logging.info("Getting geolocation for ip " + str(ip))
1✔
115
    try:
1✔
116
        ip = ipaddress.ip_address(ip)
1✔
117
    except Exception as e:
1✔
118
        logging.warning("Error while parsing ip address for geolocation : " + str(e))
1✔
119
        return ""
1✔
120
    if ipaddress.ip_address(ip).is_private:
1✔
121
        logging.error("IP address is private, not getting geolocation to avoid leaking private IPs. This can happen if the server is behind a proxy that does not set the X-Forwarded-For header. Please refer to the documentation to configure a trusted proxy. IP : " + str(ip))
1✔
122
        return ""
1✔
123
    if conf.features.ip_geolocation.enabled == False:
1✔
124
        logging.info("IP Geolocation is disabled. Aborting geolocation for ip " + str(ip))
1✔
125
        return str(ip) 
1✔
126
    result = str(ip)
1✔
127
    try:
1✔
128
        with geoip2.database.Reader(conf.features.ip_geolocation.geoip_database_path) as reader:
1✔
129
            geo = reader.city(ip)
1✔
130
            city = geo.city.name + ", " if geo.city.name != None else ""
1✔
131
            region = geo.subdivisions.most_specific.name + ", " if geo.subdivisions.most_specific.name != None else ""
1✔
132
            country = ""
1✔
133
            if geo.country.name != None: 
1✔
134
                country = geo.country.name 
1✔
135
            elif geo.registered_country.name != None:
1✔
136
                country = geo.registered_country.name 
1✔
137
            result = f"{ip} ({city}{region}{country})"
1✔
138
    except Exception as e:
1✔
139
        logging.error("Error while getting geolocation for ip " + str(ip) + " : " + str(e))
1✔
140
    return result
1✔
141

142
def get_ip(request):
1✔
143
    def test_ip(ip):
1✔
144
        try:
1✔
145
            if(ip.is_private):
1✔
146
                return False
1✔
147
            return True
1✔
UNCOV
148
        except Exception as e:
×
UNCOV
149
            logging.error("Error while testing ip address : " + str(e))
×
UNCOV
150
            return False
×
151
    try:
1✔
152
        with app.app.app_context():
1✔
153
            remote_ip = ipaddress.ip_address(request.remote_addr)
1✔
154
    except Exception as e:
1✔
155
        logging.error("Error while getting remote ip address : " + str(e))
1✔
156
        return None
1✔
157
    is_remote_ip_a_trusted_proxy = False
1✔
158
    if conf.api.trusted_proxy != None:
1✔
159
        for ip_network in conf.api.trusted_proxy:
1✔
160
            if remote_ip in ip_network:
1✔
161
                is_remote_ip_a_trusted_proxy = True
1✔
162
                break
1✔
163

164
    if is_remote_ip_a_trusted_proxy:
1✔
165
        if "X-Forwarded-For" in request.headers:
1✔
166
            try:
1✔
167
                #Ipv6 in priority
168
                forwarded_ip = re.search(r'\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])){3}))|:)))(%.+)?\s*', request.headers["X-Forwarded-For"])
1✔
169
                if forwarded_ip != None:
1✔
170
                    forwarded_ip = forwarded_ip.group(0)
1✔
171
                else:
172
                    # Ipv4
173
                    forwarded_ip = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', request.headers["X-Forwarded-For"])[0]
1✔
174
                    if forwarded_ip == []:
1✔
UNCOV
175
                        logging.error("Could not get ip address from request. No IPv6 or IPv4 found in the header. Forwarded ip : " + str(request.headers["X-Forwarded-For"]))
×
UNCOV
176
                        return None
×
177
                if test_ip(ipaddress.ip_address(forwarded_ip)):
1✔
178
                    return forwarded_ip
1✔
179
                else:
180
                    logging.error("Could not get ip address from request. The forwarded IP was not a valid ip address. Test didn't pass, IP very likely to be private. Forwarded ip : " + str(forwarded_ip))
1✔
181
                    return None
1✔
UNCOV
182
            except Exception as e:
×
UNCOV
183
                logging.error("Could not get ip address from request. Error while parsing forwarded ip : " + str(e) + ". Forwarded ip : " + str(request.headers["X-Forwarded-For"]))
×
UNCOV
184
                return None
×
185
        else:
186
            logging.error("Could not get ip address from request. The request was made through a trusted proxy but the X-Forwarded-For header was not set.")
1✔
187
            return None
1✔
188
    else:
189
        if test_ip(remote_ip):
1✔
190
            return str(remote_ip)
1✔
191
        else:
192
            logging.error("Could not get ip address from request. The remote IP was NOT a trusted proxy. Remote ip : " + str(remote_ip))
1✔
193
            return None
1✔
194

195
def unsafe_json_vault_validation(json:str) -> (bool, str):
1✔
196
    if len(json) > 4 * 1024 *1024:
1✔
UNCOV
197
        return False, "The vault is too big. The maximum size is 4MB"
×
198
    schema = {
1✔
199
        "type": "object",
200
        "patternProperties": {
201
            "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$": {"type": "string"},
202
        },
203
            "additionalProperties": False
204
    }
205
    try:
1✔
206
        validate(json, schema)
1✔
207
        return True, "OK"
1✔
208
    except Exception as e:
1✔
209
        logging.error("Error while validating vault json : " + str(e))
1✔
210
        print(e)
1✔
211
        return False, "The vault submitted is invalid. If you submitted this vault through the web interface, please report this issue to the support."
1✔
212

213

214
def revoke_session(session_id=None, refresh_id=None):
1✔
215
    logging.info(f"Revoking session {session_id} and refresh {refresh_id}")
1✔
216
    session_repo = SessionTokenRepo()
1✔
217
    refresh_repo = RefreshTokenRepo()
1✔
218
    session = session_repo.get_session_token_by_id(session_id)
1✔
219
    refresh = refresh_repo.get_refresh_token_by_id(refresh_id)
1✔
220
    if session != None:
1✔
221
        session_repo.revoke(session.id)
1✔
222
        logging.info(f"Revoked session {session.id}")
1✔
223
        if not refresh or refresh.session_token_id != session.id:
1✔
224
            associated_refresh = refresh_repo.get_refresh_token_by_session_id(session.id)
1✔
225
            logging.info(f"Revoked refresh {session.id} because the associated session {session.id} was revoked")
1✔
226
            refresh_repo.revoke(associated_refresh.id) if associated_refresh != None else None
1✔
227
    if refresh != None:
1✔
228
        refresh_repo.revoke(refresh.id)
1✔
229
        logging.info(f"Revoked refresh {refresh.id}")
1✔
230
        if not session or session.id != refresh.session_token_id:
1✔
231
            associated_session = session_repo.get_session_token_by_id(refresh.session_token_id)
1✔
232
            session_repo.revoke(associated_session.id) if  associated_session != None else None
1✔
233
            logging.info(f"Revoked session {associated_session.id} because the associated refresh {refresh.id} was revoked")
1✔
234
    return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc