• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Open-MSS / MSS / 23642212528

27 Mar 2026 10:34AM UTC coverage: 69.806% (-0.03%) from 69.834%
23642212528

Pull #3033

github

web-flow
Merge c843548b3 into 29ba126c7
Pull Request #3033: refactor server.py for using Blueprint

598 of 993 new or added lines in 15 files covered. (60.22%)

23 existing lines in 6 files now uncovered.

14588 of 20898 relevant lines covered (69.81%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.14
/mslib/utils/auth.py
1
# -*- coding: utf-8 -*-
2
"""
3

4
    mslib.utils.auth
5
    ~~~~~~~~~~~~~~~~
6

7
    handles passwords from the keyring for login and http_auuth
8

9

10
    To better understand of the code, look at the 'ships' example from
11
    chapter 14/16 of 'Rapid GUI Programming with Python and Qt: The
12
    Definitive Guide to PyQt Programming' (Mark Summerfield).
13

14
    This file is part of MSS.
15

16
    :copyright: Copyright 2023 Reimar Bauer
17
    :copyright: Copyright 2023-2026 by the MSS team, see AUTHORS.
18
    :license: APACHE-2.0, see LICENSE for details.
19

20
    Licensed under the Apache License, Version 2.0 (the "License");
21
    you may not use this file except in compliance with the License.
22
    You may obtain a copy of the License at
23

24
       http://www.apache.org/licenses/LICENSE-2.0
25

26
    Unless required by applicable law or agreed to in writing, software
27
    distributed under the License is distributed on an "AS IS" BASIS,
28
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29
    See the License for the specific language governing permissions and
30
    limitations under the License.
31
"""
32
import datetime
1✔
33
import functools
1✔
34
import logging
1✔
35

36
import email_validator
1✔
37
import keyring
1✔
38
import sqlalchemy
1✔
39
from flask import request, abort, g
1✔
40
from flask_mail import Message
1✔
41
from itsdangerous import URLSafeTimedSerializer, BadSignature
1✔
42

43
from mslib.mscolab.app import APP
1✔
44
from mslib.mscolab.conf import setup_saml2_backend
1✔
45
from mslib.mscolab.models import User
1✔
46

47
try:
1✔
48
    from jeepney.wrappers import DBusErrorResponse
1✔
UNCOV
49
except (ImportError, ModuleNotFoundError):
×
UNCOV
50
    class DBusErrorResponse(Exception):
×
51
        """
52
        Fallback definition on not DBus systems
53
        """
UNCOV
54
        def __init__(self, message):
×
55
            super().__init__(message)
×
56

57
from mslib.msui import constants
1✔
58

59

60
NAME = __name__
1✔
61

62

63
def del_password_from_keyring(service_name=NAME, username=""):
1✔
64
    """
65
    removes an entry by username and service_name from the keyring
66
    """
67
    if username.strip() != "":
1✔
68
        try:
1✔
69
            keyring.delete_password(service_name=service_name, username=username)
1✔
70
        except (keyring.errors.NoKeyringError, keyring.errors.PasswordDeleteError) as ex:
×
71
            logging.warning("Can't use Keyring on your system: %s" % ex)
×
72

73

74
def get_password_from_keyring(service_name=NAME, username=""):
1✔
75
    """
76
    When we request a username we use this function to fill in a form field with a password
77
    In this case by none existing credentials in the keyring we have to return an empty string
78
    """
79
    if username.strip() != "":
1✔
80
        try:
1✔
81
            cred = keyring.get_credential(service_name=service_name, username=username)
1✔
82
            if username is not None and cred is None:
1✔
83
                return ""
×
84
            elif cred is None:
1✔
85
                return None
×
86
            else:
87
                return cred.password
1✔
88
        except (keyring.errors.KeyringLocked, keyring.errors.InitError, DBusErrorResponse) as ex:
×
89
            logging.warning(ex)
×
90
            return None
×
91

92

93
def save_password_to_keyring(service_name=NAME, username="", password=""):
1✔
94
    """
95
    save a username and password for a given service_name
96
    """
97
    if "" not in (username.strip(), password.strip()):
1✔
98
        try:
1✔
99
            keyring.set_password(service_name=service_name, username=username, password=password)
1✔
100
        except keyring.errors.NoKeyringError as ex:
×
101
            logging.info("Can't use Keyring on your system: %s" % ex)
×
102

103

104
def get_auth_from_url_and_name(server_url, http_auth, overwrite_login_cache=True):
1✔
105
    """
106
    gets auth_username from http_auth and password from keyring for a given server_url
107
    """
108
    name = ""
1✔
109
    for url, auth_name in http_auth.items():
1✔
110
        if server_url == url:
1✔
111
            try:
1✔
112
                password = get_password_from_keyring(service_name=url, username=auth_name)
1✔
113
            except keyring.errors.NoKeyringError as ex:
×
114
                password = None
×
115
                logging.info("Can't use Keyring on your system: %s" % ex)
×
116
            if overwrite_login_cache and password is not None and password.strip() != "":
1✔
117
                constants.AUTH_LOGIN_CACHE[server_url] = (auth_name, password)
1✔
118
            name = auth_name
1✔
119
            break
1✔
120
    if name == "":
1✔
121
        name = None
1✔
122
    auth = constants.AUTH_LOGIN_CACHE.get(server_url, (name, None))
1✔
123
    return auth
1✔
124

125

126
def check_login(emailid, password):
1✔
127
    try:
1✔
128
        user = User.query.filter_by(emailid=str(emailid)).first()
1✔
NEW
129
    except sqlalchemy.exc.OperationalError as ex:
×
NEW
130
        logging.debug("Problem in the database (%ex), likely version client different", ex)
×
NEW
131
        return False
×
132
    if user is not None:
1✔
133
        if APP.config['MAIL_ENABLED']:
1✔
NEW
134
            if user.confirmed:
×
NEW
135
                if user.verify_password(password):
×
NEW
136
                    return user
×
137
        else:
138
            if user.verify_password(password):
1✔
139
                return user
1✔
140
    return False
1✔
141

142

143
def register_user(email, password, username, fullname):
1✔
144
    if len(str(email.strip())) == 0 or len(str(username.strip())) == 0:
1✔
NEW
145
        return {"success": False, "message": "Your username or email cannot be empty"}
×
146
    is_valid_username = True if username.find("@") == -1 else False
1✔
147
    try:
1✔
148
        # ToDo verify what changed for check_deliverability
149
        email_validator.validate_email(email, check_deliverability=APP.config['MAIL_ENABLED'])
1✔
150
    except (email_validator.exceptions.EmailSyntaxError):
1✔
151
        return {"success": False, "message": "Your email ID is not valid!"}
1✔
152
    if not is_valid_username:
1✔
153
        return {"success": False, "message": "Your username cannot contain @ symbol!"}
1✔
154
    user_exists = User.query.filter_by(emailid=str(email)).first()
1✔
155
    if user_exists:
1✔
156
        return {"success": False, "message": "This email ID is already taken!"}
1✔
157
    user_exists = User.query.filter_by(username=str(username)).first()
1✔
158
    if user_exists:
1✔
NEW
159
        return {"success": False, "message": "This username is already registered"}
×
160
    from mslib.mscolab.server import getConfig
1✔
161
    fm = getConfig()[3]
1✔
162
    user = User(email, username, password, fullname)
1✔
163
    result = fm.modify_user(user, action="create")
1✔
164
    return {"success": result}
1✔
165

166

167
def generate_confirmation_token(email):
1✔
NEW
168
    serializer = URLSafeTimedSerializer(APP.config['SECRET_KEY'])
×
NEW
169
    return serializer.dumps(email, salt=APP.config['SECURITY_PASSWORD_SALT'])
×
170

171

172
def send_email(to, subject, template):
1✔
NEW
173
    if APP.config['MAIL_DEFAULT_SENDER'] is not None:
×
NEW
174
        msg = Message(
×
175
            subject,
176
            recipients=[to],
177
            html=template,
178
            sender=APP.config['MAIL_DEFAULT_SENDER']
179
        )
NEW
180
        try:
×
NEW
181
            from mslib.mscolab.server import getConfig
×
NEW
182
            mail = getConfig()[4]
×
NEW
183
            mail.send(msg)
×
NEW
184
        except IOError:
×
NEW
185
            logging.error("Can't send email to %s", to)
×
186
    else:
NEW
187
        logging.debug("setup user verification by email")
×
188

189

190
def confirm_token(token, expiration=3600):
1✔
NEW
191
    serializer = URLSafeTimedSerializer(APP.config['SECRET_KEY'])
×
NEW
192
    try:
×
NEW
193
        email = serializer.loads(
×
194
            token,
195
            salt=APP.config['SECURITY_PASSWORD_SALT'],
196
            max_age=expiration
197
        )
NEW
198
    except (IOError, BadSignature):
×
NEW
199
        return False
×
NEW
200
    return email
×
201

202

203
def get_idp_entity_id(selected_idp):
1✔
204
    """
205
    Finds the entity_id from the configured IDPs
206
    :return: the entity_id of the idp or None
207
    """
NEW
208
    for config in setup_saml2_backend.CONFIGURED_IDPS:
×
NEW
209
        if selected_idp == config['idp_identity_name']:
×
NEW
210
            idps = config['idp_data']['saml2client'].metadata.identity_providers()
×
NEW
211
            only_idp = idps[0]
×
NEW
212
            entity_id = only_idp
×
NEW
213
            return entity_id
×
NEW
214
    return None
×
215

216

217
def create_or_update_idp_user(email, username, token, authentication_backend):
1✔
218
    """
219
    Creates or updates an idp user in the system based on the provided email,
220
     username, token, and authentication backend.
221
    :param email: idp users email
222
    :param username: idp users username
223
    :param token: authentication token
224
    :param authentication_backend: authenticated identity providers name
225
    :return: bool : query success or not
226
    """
NEW
227
    from mslib.mscolab.server import getConfig
×
NEW
228
    fm = getConfig()[3]
×
NEW
229
    user = User.query.filter_by(emailid=email).first()
×
NEW
230
    if not user:
×
231
        # using an IDP for a new account/profile, e-mail is already verified by the IDP
NEW
232
        confirm_time = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=1)
×
NEW
233
        user = User(email, username, password=token, confirmed=True, confirmed_on=confirm_time,
×
234
                    authentication_backend=authentication_backend)
NEW
235
        result = fm.modify_user(user, action="create")
×
236
    else:
NEW
237
        user.authentication_backend = authentication_backend
×
NEW
238
        user.hash_password(token)
×
NEW
239
        result = fm.modify_user(user, action="update_idp_user")
×
NEW
240
    return result
×
241

242

243
def verify_user(func):
1✔
244
    @functools.wraps(func)
1✔
245
    def wrapper(*args, **kwargs):
1✔
246
        try:
1✔
247
            user = User.verify_auth_token(request.args.get('token', request.form.get('token', False)))
1✔
NEW
248
        except TypeError:
×
NEW
249
            logging.debug("no token in request form")
×
NEW
250
            abort(404)
×
251
        if not user:
1✔
252
            return "False"
1✔
253
        else:
254
            # saving user details in flask.g
255
            if APP.config['MAIL_ENABLED']:
1✔
NEW
256
                if user.confirmed:
×
NEW
257
                    g.user = user
×
NEW
258
                    return func(*args, **kwargs)
×
259
                else:
NEW
260
                    return "False"
×
261
            else:
262
                g.user = user
1✔
263
                return func(*args, **kwargs)
1✔
264
    return wrapper
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc