• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Open-MSS / MSS / 10653123390

01 Sep 2024 10:12AM UTC coverage: 69.967% (-0.07%) from 70.037%
10653123390

Pull #2495

github

web-flow
Merge d3a10b8f0 into 0b95679f6
Pull Request #2495: remove the conda/mamba based updater.

24 of 41 new or added lines in 5 files covered. (58.54%)

92 existing lines in 6 files now uncovered.

13843 of 19785 relevant lines covered (69.97%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.17
/mslib/mscolab/server.py
1
# -*- coding: utf-8 -*-
2
"""
3

4
    mslib.mscolab.server
5
    ~~~~~~~~~~~~~~~~~~~~
6

7
    Server for mscolab module
8

9
    This file is part of MSS.
10

11
    :copyright: Copyright 2019 Shivashis Padhi
12
    :copyright: Copyright 2019-2024 by the MSS team, see AUTHORS.
13
    :license: APACHE-2.0, see LICENSE for details.
14

15
    Licensed under the Apache License, Version 2.0 (the "License");
16
    you may not use this file except in compliance with the License.
17
    You may obtain a copy of the License at
18

19
       http://www.apache.org/licenses/LICENSE-2.0
20

21
    Unless required by applicable law or agreed to in writing, software
22
    distributed under the License is distributed on an "AS IS" BASIS,
23
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24
    See the License for the specific language governing permissions and
25
    limitations under the License.
26
"""
27
import fs
1✔
28
import sys
1✔
29
import functools
1✔
30
import json
1✔
31
import logging
1✔
32
import datetime
1✔
33
import secrets
1✔
34
import socketio
1✔
35
import sqlalchemy.exc
1✔
36
import werkzeug
1✔
37
import flask_migrate
1✔
38

39
from itsdangerous import URLSafeTimedSerializer, BadSignature
1✔
40
from flask import g, jsonify, request, render_template, flash
1✔
41
from flask import send_from_directory, abort, url_for, redirect
1✔
42
from flask_mail import Mail, Message
1✔
43
from flask_cors import CORS
1✔
44
from flask_httpauth import HTTPBasicAuth
1✔
45
from validate_email import validate_email
1✔
46
from saml2.metadata import create_metadata_string
1✔
47
from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST
1✔
48
from flask.wrappers import Response
1✔
49

50
from mslib.mscolab.conf import mscolab_settings, setup_saml2_backend
1✔
51
from mslib.mscolab.models import Change, MessageType, User
1✔
52
from mslib.mscolab.sockets_manager import _setup_managers
1✔
53
from mslib.mscolab.utils import create_files, get_message_dict
1✔
54
from mslib.utils import conditional_decorator
1✔
55
from mslib.index import create_app
1✔
56
from mslib.mscolab.forms import ResetRequestForm, ResetPasswordForm
1✔
57
from mslib.mscolab import migrations
1✔
58

59

60
def _handle_db_upgrade():
1✔
61
    from mslib.mscolab.models import db
1✔
62

63
    create_files()
1✔
64
    inspector = sqlalchemy.inspect(db.engine)
1✔
65
    existing_tables = inspector.get_table_names()
1✔
66
    if ("alembic_version" not in existing_tables and len(existing_tables) > 0) or (
1✔
67
        "alembic_version" in existing_tables
68
        and len(existing_tables) > 1
69
        and db.session.execute(sqlalchemy.text("SELECT * FROM alembic_version")).first() is None
70
    ):
71
        sys.exit(
×
72
            """Your database contains no alembic_version revision identifier, but it has a schema. This suggests \
73
that you have a pre-existing database but haven't followed the database migration instructions. To prevent damage to \
74
your database MSColab will abort. Please follow the documentation for a manual database migration from MSColab v8/v9."""
75
        )
76

77
    is_empty_database = len(existing_tables) == 0 or (
1✔
78
        len(existing_tables) == 1
79
        and "alembic_version" in existing_tables
80
        and db.session.execute(sqlalchemy.text("SELECT * FROM alembic_version")).first() is None
81
    )
82
    # If a database connection to migrate from is set and the target database is empty, then migrate the existing data
83
    if is_empty_database and mscolab_settings.SQLALCHEMY_DB_URI_TO_MIGRATE_FROM is not None:
1✔
84
        logging.info("The target database is empty and a database to migrate from is set, starting the data migration")
1✔
85
        source_engine = sqlalchemy.create_engine(mscolab_settings.SQLALCHEMY_DB_URI_TO_MIGRATE_FROM)
1✔
86
        source_metadata = sqlalchemy.MetaData()
1✔
87
        source_metadata.reflect(bind=source_engine)
1✔
88
        # Determine the previous MSColab version based on the database content and upgrade to the corresponding revision
89
        if "authentication_backend" in source_metadata.tables["users"].columns:
1✔
90
            # It should be v9
91
            flask_migrate.upgrade(directory=migrations.__path__[0], revision="c171019fe3ee")
1✔
92
        else:
93
            # It's probably v8
94
            flask_migrate.upgrade(directory=migrations.__path__[0], revision="92eaba86a92e")
1✔
95
        # Copy over the existing data
96
        target_engine = sqlalchemy.create_engine(mscolab_settings.SQLALCHEMY_DB_URI)
1✔
97
        target_metadata = sqlalchemy.MetaData()
1✔
98
        target_metadata.reflect(bind=target_engine)
1✔
99
        with source_engine.connect() as src_connection, target_engine.connect() as target_connection:
1✔
100
            for table in source_metadata.sorted_tables:
1✔
101
                if table.name == "alembic_version":
1✔
102
                    # Do not migrate the alembic_version table!
103
                    continue
104
                logging.debug("Copying table %s", table.name)
1✔
105
                stmt = target_metadata.tables[table.name].insert()
1✔
106
                for row in src_connection.execute(table.select()):
1✔
107
                    logging.debug("Copying row %s", row)
1✔
108
                    row = tuple(
1✔
109
                        r.replace(tzinfo=datetime.timezone.utc) if isinstance(r, datetime.datetime) else r for r in row
110
                    )
111
                    target_connection.execute(stmt.values(row))
1✔
112
            target_connection.commit()
1✔
113
            if target_engine.name == "postgresql":
1✔
114
                # Fix the databases auto-increment sequences, if it is a PostgreSQL database
115
                # For reference, see: https://wiki.postgresql.org/wiki/Fixing_Sequences
116
                logging.info("Using a PostgreSQL database, will fix up sequences")
×
117
                cur = target_connection.execute(sqlalchemy.text(r"""
×
118
SELECT
119
    'SELECT SETVAL(' ||
120
    quote_literal(quote_ident(sequence_namespace.nspname) || '.' || quote_ident(class_sequence.relname)) ||
121
    ', COALESCE(MAX(' ||quote_ident(pg_attribute.attname)|| '), 1) ) FROM ' ||
122
    quote_ident(table_namespace.nspname)|| '.'||quote_ident(class_table.relname)|| ';'
123
FROM pg_depend
124
    INNER JOIN pg_class AS class_sequence
125
        ON class_sequence.oid = pg_depend.objid
126
            AND class_sequence.relkind = 'S'
127
    INNER JOIN pg_class AS class_table
128
        ON class_table.oid = pg_depend.refobjid
129
    INNER JOIN pg_attribute
130
        ON pg_attribute.attrelid = class_table.oid
131
            AND pg_depend.refobjsubid = pg_attribute.attnum
132
    INNER JOIN pg_namespace as table_namespace
133
        ON table_namespace.oid = class_table.relnamespace
134
    INNER JOIN pg_namespace AS sequence_namespace
135
        ON sequence_namespace.oid = class_sequence.relnamespace
136
ORDER BY sequence_namespace.nspname, class_sequence.relname;
137
"""))
138
                for stmt, in cur.all():
×
139
                    target_connection.execute(sqlalchemy.text(stmt))
×
140
                target_connection.commit()
×
141
        logging.info("Data migration finished")
1✔
142

143
    # Upgrade to the latest database revision
144
    flask_migrate.upgrade(directory=migrations.__path__[0])
1✔
145

146
    logging.info("Database initialised successfully!")
1✔
147

148

149
APP = create_app(__name__, imprint=mscolab_settings.IMPRINT, gdpr=mscolab_settings.GDPR)
1✔
150
with APP.app_context():
1✔
151
    _handle_db_upgrade()
1✔
152
mail = Mail(APP)
1✔
153
CORS(APP, origins=mscolab_settings.CORS_ORIGINS if hasattr(mscolab_settings, "CORS_ORIGINS") else ["*"])
1✔
154
auth = HTTPBasicAuth()
1✔
155

156

157
try:
1✔
158
    from mscolab_auth import mscolab_auth
1✔
159
except ImportError as ex:
1✔
160
    logging.warning("Couldn't import mscolab_auth (ImportError:'{%s), creating dummy config.", ex)
1✔
161

162
    class mscolab_auth:
1✔
163
        allowed_users = [("mscolab", "add_md5_digest_of_PASSWORD_here"),
1✔
164
                         ("add_new_user_here", "add_md5_digest_of_PASSWORD_here")]
165
        __file__ = None
1✔
166

167
# setup http auth
168
if mscolab_settings.__dict__.get('enable_basic_http_authentication', False):
1✔
169
    logging.debug("Enabling basic HTTP authentication. Username and "
×
170
                  "password required to access the service.")
171
    import hashlib
×
172

173
    def authfunc(username, password):
×
174
        for u, p in mscolab_auth.allowed_users:
×
175
            if (u == username) and (p == hashlib.md5(password.encode('utf-8')).hexdigest()):
×
176
                return True
×
177
        return False
×
178

179
    @auth.verify_password
×
180
    def verify_pw(username, password):
×
181
        if request.authorization:
×
182
            auth = request.authorization
×
183
            username = auth.username
×
184
            password = auth.password
×
185
        return authfunc(username, password)
×
186

187

188
def send_email(to, subject, template):
1✔
189
    if APP.config['MAIL_DEFAULT_SENDER'] is not None:
×
190
        msg = Message(
×
191
            subject,
192
            recipients=[to],
193
            html=template,
194
            sender=APP.config['MAIL_DEFAULT_SENDER']
195
        )
196
        try:
×
197
            mail.send(msg)
×
198
        except IOError:
×
199
            logging.error("Can't send email to %s", to)
×
200
    else:
201
        logging.debug("setup user verification by email")
×
202

203

204
def generate_confirmation_token(email):
1✔
205
    serializer = URLSafeTimedSerializer(APP.config['SECRET_KEY'])
×
206
    return serializer.dumps(email, salt=APP.config['SECURITY_PASSWORD_SALT'])
×
207

208

209
def confirm_token(token, expiration=3600):
1✔
210
    serializer = URLSafeTimedSerializer(APP.config['SECRET_KEY'])
×
211
    try:
×
212
        email = serializer.loads(
×
213
            token,
214
            salt=APP.config['SECURITY_PASSWORD_SALT'],
215
            max_age=expiration
216
        )
217
    except (IOError, BadSignature):
×
218
        return False
×
219
    return email
×
220

221

222
def _initialize_managers(app):
1✔
223
    sockio, cm, fm = _setup_managers(app)
1✔
224
    # initializing socketio and db
225
    app.wsgi_app = socketio.Middleware(socketio.server, app.wsgi_app)
1✔
226
    sockio.init_app(app)
1✔
227
    # db.init_app(app)
228
    return app, sockio, cm, fm
1✔
229

230

231
_app, sockio, cm, fm = _initialize_managers(APP)
1✔
232

233

234
def check_login(emailid, password):
1✔
235
    try:
1✔
236
        user = User.query.filter_by(emailid=str(emailid)).first()
1✔
237
    except sqlalchemy.exc.OperationalError as ex:
×
238
        logging.debug("Problem in the database (%ex), likely version client different", ex)
×
239
        return False
×
240
    if user is not None:
1✔
241
        if mscolab_settings.MAIL_ENABLED:
1✔
242
            if user.confirmed:
×
243
                if user.verify_password(password):
×
244
                    return user
×
245
        else:
246
            if user.verify_password(password):
1✔
247
                return user
1✔
248
    return False
1✔
249

250

251
def register_user(email, password, username):
1✔
252
    if len(str(email.strip())) == 0 or len(str(username.strip())) == 0:
1✔
253
        return {"success": False, "message": "Your username or email cannot be empty"}
×
254
    is_valid_username = True if username.find("@") == -1 else False
1✔
255
    is_valid_email = validate_email(email)
1✔
256
    if not is_valid_email:
1✔
257
        return {"success": False, "message": "Your email ID is not valid!"}
1✔
258
    if not is_valid_username:
1✔
259
        return {"success": False, "message": "Your username cannot contain @ symbol!"}
1✔
260
    user_exists = User.query.filter_by(emailid=str(email)).first()
1✔
261
    if user_exists:
1✔
262
        return {"success": False, "message": "This email ID is already taken!"}
1✔
263
    user_exists = User.query.filter_by(username=str(username)).first()
1✔
264
    if user_exists:
1✔
265
        return {"success": False, "message": "This username is already registered"}
×
266
    user = User(email, username, password)
1✔
267
    result = fm.modify_user(user, action="create")
1✔
268
    return {"success": result}
1✔
269

270

271
def verify_user(func):
1✔
272
    @functools.wraps(func)
1✔
273
    def wrapper(*args, **kwargs):
1✔
274
        try:
1✔
275
            user = User.verify_auth_token(request.args.get('token', request.form.get('token', False)))
1✔
276
        except TypeError:
×
277
            logging.debug("no token in request form")
×
278
            abort(404)
×
279
        if not user:
1✔
280
            return "False"
1✔
281
        else:
282
            # saving user details in flask.g
283
            if mscolab_settings.MAIL_ENABLED:
1✔
284
                if user.confirmed:
×
285
                    g.user = user
×
286
                    return func(*args, **kwargs)
×
287
                else:
288
                    return "False"
×
289
            else:
290
                g.user = user
1✔
291
                return func(*args, **kwargs)
1✔
292
    return wrapper
1✔
293

294

295
def get_idp_entity_id(selected_idp):
1✔
296
    """
297
    Finds the entity_id from the configured IDPs
298
    :return: the entity_id of the idp or None
299
    """
300
    for config in setup_saml2_backend.CONFIGURED_IDPS:
×
301
        if selected_idp == config['idp_identity_name']:
×
302
            idps = config['idp_data']['saml2client'].metadata.identity_providers()
×
303
            only_idp = idps[0]
×
304
            entity_id = only_idp
×
305
            return entity_id
×
306
    return None
×
307

308

309
def create_or_update_idp_user(email, username, token, authentication_backend):
1✔
310
    """
311
    Creates or updates an idp user in the system based on the provided email,
312
     username, token, and authentication backend.
313
    :param email: idp users email
314
    :param username: idp users username
315
    :param token: authentication token
316
    :param authentication_backend: authenticated identity providers name
317
    :return: bool : query success or not
318
    """
319
    user = User.query.filter_by(emailid=email).first()
×
320
    if not user:
×
321
        # using an IDP for a new account/profile, e-mail is already verified by the IDP
322
        confirm_time = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=1)
×
323
        user = User(email, username, password=token, confirmed=True, confirmed_on=confirm_time,
×
324
                    authentication_backend=authentication_backend)
325
        result = fm.modify_user(user, action="create")
×
326
    else:
327
        user.authentication_backend = authentication_backend
×
328
        user.hash_password(token)
×
329
        result = fm.modify_user(user, action="update_idp_user")
×
330
    return result
×
331

332

333
@APP.route('/')
1✔
334
def home():
1✔
335
    return render_template("/index.html")
1✔
336

337

338
@APP.route("/status")
1✔
339
def hello():
1✔
340
    if request.authorization is not None:
1✔
341
        if mscolab_settings.__dict__.get('enable_basic_http_authentication', False):
×
342
            auth.login_required()
×
343
            return json.dumps({
×
344
                'message': "Mscolab server",
345
                'use_saml2': mscolab_settings.USE_SAML2,
346
                'direct_login': mscolab_settings.DIRECT_LOGIN
347
            })
348
        return json.dumps({
×
349
            'message': "Mscolab server",
350
            'use_saml2': mscolab_settings.USE_SAML2,
351
            'direct_login': mscolab_settings.DIRECT_LOGIN
352
        })
353
    else:
354
        return json.dumps({
1✔
355
            'message': "Mscolab server",
356
            'use_saml2': mscolab_settings.USE_SAML2,
357
            'direct_login': mscolab_settings.DIRECT_LOGIN
358
        })
359

360

361
@APP.route('/token', methods=["POST"])
1✔
362
@conditional_decorator(auth.login_required, mscolab_settings.__dict__.get('enable_basic_http_authentication', False))
1✔
363
def get_auth_token():
1✔
364
    emailid = request.form['email']
1✔
365
    password = request.form['password']
1✔
366
    user = check_login(emailid, password)
1✔
367
    if user is not False:
1✔
368
        if mscolab_settings.MAIL_ENABLED:
1✔
369
            if user.confirmed:
×
370
                token = user.generate_auth_token()
×
371
                return json.dumps({
×
372
                    'token': token,
373
                    'user': {'username': user.username, 'id': user.id}})
374
            else:
375
                return "False"
×
376
        else:
377
            token = user.generate_auth_token()
1✔
378
            return json.dumps({
1✔
379
                'token': token,
380
                'user': {'username': user.username, 'id': user.id}})
381
    else:
382
        logging.debug("Unauthorized user: %s", emailid)
1✔
383
        return "False"
1✔
384

385

386
@APP.route('/test_authorized')
1✔
387
def authorized():
1✔
388
    token = request.args.get('token', request.form.get('token'))
1✔
389
    user = User.verify_auth_token(token)
1✔
390
    if user is not None:
1✔
391
        if mscolab_settings.MAIL_ENABLED:
1✔
392
            if user.confirmed is False:
×
393
                return "False"
×
394
            else:
395
                return "True"
×
396
        else:
397
            return "True"
1✔
398
    else:
399
        return "False"
1✔
400

401

402
@APP.route("/register", methods=["POST"])
1✔
403
@conditional_decorator(auth.login_required, mscolab_settings.__dict__.get('enable_basic_http_authentication', False))
1✔
404
def user_register_handler():
1✔
405
    email = request.form['email']
1✔
406
    password = request.form['password']
1✔
407
    username = request.form['username']
1✔
408
    result = register_user(email, password, username)
1✔
409
    status_code = 200
1✔
410
    try:
1✔
411
        if result["success"]:
1✔
412
            status_code = 201
1✔
413
            if mscolab_settings.MAIL_ENABLED:
1✔
414
                status_code = 204
×
415
                token = generate_confirmation_token(email)
×
416
                confirm_url = url_for('confirm_email', token=token, _external=True)
×
417
                html = render_template('user/activate.html', username=username, confirm_url=confirm_url)
×
418
                subject = "MSColab Please confirm your email"
×
419
                send_email(email, subject, html)
×
420
    except TypeError:
×
421
        result, status_code = {"success": False}, 401
×
422
    return jsonify(result), status_code
1✔
423

424

425
@APP.route('/confirm/<token>')
1✔
426
def confirm_email(token):
1✔
427
    if mscolab_settings.MAIL_ENABLED:
×
428
        try:
×
429
            email = confirm_token(token)
×
430
        except TypeError:
×
431
            return jsonify({"success": False}), 401
×
432
        if email is False:
×
433
            return jsonify({"success": False}), 401
×
434
        user = User.query.filter_by(emailid=email).first_or_404()
×
435
        if user.confirmed:
×
436
            return render_template('user/confirmed.html', username=user.username)
×
437
        else:
438
            fm.modify_user(user, attribute="confirmed_on", value=datetime.datetime.now(tz=datetime.timezone.utc))
×
439
            fm.modify_user(user, attribute="confirmed", value=True)
×
440
            return render_template('user/confirmed.html', username=user.username)
×
441

442

443
@APP.route('/user', methods=["GET"])
1✔
444
@verify_user
1✔
445
def get_user():
1✔
446
    return json.dumps({'user': {'id': g.user.id, 'username': g.user.username}})
1✔
447

448

449
@APP.route('/upload_profile_image', methods=["POST"])
1✔
450
@verify_user
1✔
451
def upload_profile_image():
1✔
452
    user_id = request.form['user_id']
×
453
    file = request.files['image']
×
454
    if not file:
×
455
        return jsonify({'message': 'No file provided or invalid file type'}), 400
×
456
    if not file.mimetype.startswith('image/'):
×
457
        return jsonify({'message': 'Invalid file type'}), 400
×
458
    if file.content_length > mscolab_settings.MAX_UPLOAD_SIZE:
×
459
        return jsonify({'message': 'File too large'}), 413
×
460

461
    success, message = fm.save_user_profile_image(user_id, file)
×
462
    if success:
×
463
        return jsonify({'message': message}), 200
×
464
    else:
465
        return jsonify({'message': message}), 400
×
466

467

468
@APP.route('/fetch_profile_image', methods=["GET"])
1✔
469
@verify_user
1✔
470
def fetch_profile_image():
1✔
471
    user_id = request.form['user_id']
×
472
    user = User.query.get(user_id)
×
473
    if user and user.profile_image_path:
×
474
        base_path = mscolab_settings.UPLOAD_FOLDER
×
475
        if sys.platform.startswith('win'):
×
476
            base_path = base_path.replace('\\', '/')
×
477
        filename = user.profile_image_path
×
478
        with fs.open_fs(base_path) as _fs:
×
479
            return send_from_directory(_fs.getsyspath(""), filename)
×
480
    else:
481
        abort(404)
×
482

483

484
@APP.route("/delete_own_account", methods=["POST"])
1✔
485
@verify_user
1✔
486
def delete_own_account():
1✔
487
    """
488
    delete own account
489
    """
490
    user = g.user
1✔
491
    result = fm.modify_user(user, action="delete")
1✔
492
    return jsonify({"success": result}), 200
1✔
493

494

495
# Chat related routes
496
@APP.route("/messages", methods=["GET"])
1✔
497
@verify_user
1✔
498
def messages():
1✔
499
    user = g.user
1✔
500
    op_id = request.args.get("op_id", request.form.get("op_id", None))
1✔
501
    if fm.is_member(user.id, op_id):
1✔
502
        timestamp = request.args.get("timestamp", request.form.get("timestamp", "1970-01-01T00:00:00+00:00"))
1✔
503
        chat_messages = cm.get_messages(op_id, timestamp)
1✔
504
        return jsonify({"messages": chat_messages})
1✔
505
    return "False"
×
506

507

508
@APP.route("/message_attachment", methods=["POST"])
1✔
509
@verify_user
1✔
510
def message_attachment():
1✔
511
    user = g.user
1✔
512
    op_id = request.form.get("op_id", None)
1✔
513
    if fm.is_member(user.id, op_id):
1✔
514
        file = request.files['file']
1✔
515
        message_type = MessageType(int(request.form.get("message_type")))
1✔
516
        user = g.user
1✔
517
        users = fm.fetch_users_without_permission(int(op_id), user.id)
1✔
518
        if users is False:
1✔
519
            return jsonify({"success": False, "message": "Could not send message. No file uploaded."})
×
520
        if file is not None:
1✔
521
            static_file_path = fm.upload_file(file, subfolder=str(op_id), include_prefix=True)
1✔
522
            if static_file_path is not None:
1✔
523
                new_message = cm.add_message(user, static_file_path, op_id, message_type)
1✔
524
                new_message_dict = get_message_dict(new_message)
1✔
525
                sockio.emit('chat-message-client', json.dumps(new_message_dict))
1✔
526
                return jsonify({"success": True, "path": static_file_path})
1✔
527
            else:
528
                return "False"
×
529
        return jsonify({"success": False, "message": "Could not send message. No file uploaded."})
×
530
    # normal use case never gets to this
531
    return "False"
×
532

533

534
@APP.route('/uploads/<name>/<path:filename>', methods=["GET"])
1✔
535
def uploads(name=None, filename=None):
1✔
536
    base_path = mscolab_settings.UPLOAD_FOLDER
1✔
537
    if name is None:
1✔
538
        abort(404)
×
539
    if filename is None:
1✔
540
        abort(404)
×
541
    return send_from_directory(base_path, werkzeug.security.safe_join("", name, filename))
1✔
542

543

544
# 413: Payload Too Large
545
@APP.errorhandler(413)
1✔
546
def error413(error):
1✔
547
    upload_limit = APP.config['MAX_CONTENT_LENGTH'] / 1024 / 1024
×
548
    return jsonify({"success": False, "message": f"File size too large. Upload limit is {upload_limit}MB"}), 413
×
549

550

551
# File related routes
552
@APP.route('/create_operation', methods=["POST"])
1✔
553
@verify_user
1✔
554
def create_operation():
1✔
555
    path = request.form['path']
1✔
556
    content = request.form.get('content', None)
1✔
557
    description = request.form.get('description', None)
1✔
558
    category = request.form.get('category', "default")
1✔
559
    active = (request.form.get('active', "True") == "True")
1✔
560
    last_used = datetime.datetime.now(tz=datetime.timezone.utc)
1✔
561
    user = g.user
1✔
562
    r = str(fm.create_operation(path, description, user, last_used,
1✔
563
                                content=content, category=category, active=active))
564
    if r == "True":
1✔
565
        token = request.args.get('token', request.form.get('token', False))
1✔
566
        json_config = {"token": token}
1✔
567
        sockio.sm.update_operation_list(json_config)
1✔
568
    return r
1✔
569

570

571
@APP.route('/get_operation_by_id', methods=['GET'])
1✔
572
@verify_user
1✔
573
def get_operation_by_id():
1✔
574
    op_id = request.args.get('op_id', request.form.get('op_id', None))
1✔
575
    user = g.user
1✔
576
    result = fm.get_file(int(op_id), user)
1✔
577
    if result is False:
1✔
578
        return "False"
×
579
    return json.dumps({"content": result})
1✔
580

581

582
@APP.route('/get_all_changes', methods=['GET'])
1✔
583
@verify_user
1✔
584
def get_all_changes():
1✔
585
    op_id = request.args.get('op_id', request.form.get('op_id', None))
1✔
586
    named_version = request.args.get('named_version') == "True"
1✔
587
    user = g.user
1✔
588
    result = fm.get_all_changes(int(op_id), user, named_version)
1✔
589
    if result is False:
1✔
590
        jsonify({"success": False, "message": "Some error occurred!"})
×
591
    return jsonify({"success": True, "changes": result})
1✔
592

593

594
@APP.route('/get_change_content', methods=['GET'])
1✔
595
@verify_user
1✔
596
def get_change_content():
1✔
597
    ch_id = int(request.args.get('ch_id', request.form.get('ch_id', 0)))
1✔
598
    user = g.user
1✔
599
    result = fm.get_change_content(ch_id, user)
1✔
600
    if result is False:
1✔
601
        return "False"
×
602
    return jsonify({"content": result})
1✔
603

604

605
@APP.route('/set_version_name', methods=['POST'])
1✔
606
@verify_user
1✔
607
def set_version_name():
1✔
608
    ch_id = int(request.form.get('ch_id', 0))
1✔
609
    op_id = int(request.form.get('op_id', 0))
1✔
610
    version_name = request.form.get('version_name', None)
1✔
611
    u_id = g.user.id
1✔
612
    success = fm.set_version_name(ch_id, op_id, u_id, version_name)
1✔
613
    if success is False:
1✔
614
        return jsonify({"success": False, "message": "Some error occurred!"})
×
615

616
    return jsonify({"success": True, "message": "Successfully set version name"})
1✔
617

618

619
@APP.route('/authorized_users', methods=['GET'])
1✔
620
@verify_user
1✔
621
def authorized_users():
1✔
622
    op_id = request.args.get('op_id', request.form.get('op_id', None))
1✔
623
    return json.dumps({"users": fm.get_authorized_users(int(op_id))})
1✔
624

625

626
@APP.route('/operations', methods=['GET'])
1✔
627
@verify_user
1✔
628
def get_operations():
1✔
629
    skip_archived = (request.args.get('skip_archived', request.form.get('skip_archived', "False")) == "True")
1✔
630
    user = g.user
1✔
631
    return json.dumps({"operations": fm.list_operations(user, skip_archived=skip_archived)})
1✔
632

633

634
@APP.route('/delete_operation', methods=["POST"])
1✔
635
@verify_user
1✔
636
def delete_operation():
1✔
637
    op_id = int(request.form.get('op_id', 0))
1✔
638
    user = g.user
1✔
639
    success = fm.delete_operation(op_id, user)
1✔
640
    if success is False:
1✔
641
        return jsonify({"success": False, "message": "You don't have access for this operation!"})
×
642

643
    sockio.sm.emit_operation_delete(op_id)
1✔
644
    return jsonify({"success": True, "message": "Operation was successfully deleted!"})
1✔
645

646

647
@APP.route('/update_operation', methods=['POST'])
1✔
648
@verify_user
1✔
649
def update_operation():
1✔
650
    op_id = request.form.get('op_id', None)
1✔
651
    attribute = request.form['attribute']
1✔
652
    value = request.form['value']
1✔
653
    user = g.user
1✔
654
    r = str(fm.update_operation(int(op_id), attribute, value, user))
1✔
655
    if r == "True":
1✔
656
        token = request.args.get('token', request.form.get('token', False))
1✔
657
        json_config = {"token": token}
1✔
658
        sockio.sm.update_operation_list(json_config)
1✔
659
    return r
1✔
660

661

662
@APP.route('/operation_details', methods=["GET"])
1✔
663
@verify_user
1✔
664
def get_operation_details():
1✔
665
    op_id = request.args.get('op_id', request.form.get('op_id', None))
1✔
666
    user = g.user
1✔
667
    result = fm.get_operation_details(int(op_id), user)
1✔
668
    if result is False:
1✔
669
        return "False"
×
670
    return json.dumps(result)
1✔
671

672

673
@APP.route('/set_last_used', methods=["POST"])
1✔
674
@verify_user
1✔
675
def set_last_used():
1✔
676
    op_id = request.form.get('op_id', None)
1✔
677
    user = g.user
1✔
678
    days_ago = int(request.form.get('days', 0))
1✔
679
    if days_ago > 99999:
1✔
UNCOV
680
        days_ago = 99999
×
681
    elif days_ago < -99999:
1✔
UNCOV
682
        days_ago = -99999
×
683
    fm.update_operation(int(op_id), 'last_used',
1✔
684
                        datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(days=days_ago),
685
                        user)
686
    if days_ago > mscolab_settings.ARCHIVE_THRESHOLD:
1✔
UNCOV
687
        fm.update_operation(int(op_id), "active", False, user)
×
688
    else:
689
        fm.update_operation(int(op_id), "active", True, user)
1✔
690
        token = request.args.get('token', request.form.get('token', False))
1✔
691
        json_config = {"token": token}
1✔
692
        sockio.sm.update_operation_list(json_config)
1✔
693
    return jsonify({"success": True}), 200
1✔
694

695

696
@APP.route('/undo_changes', methods=["POST"])
1✔
697
@verify_user
1✔
698
def undo_changes():
1✔
UNCOV
699
    ch_id = request.form.get('ch_id', -1)
×
700
    ch_id = int(ch_id)
×
701
    user = g.user
×
702
    result = fm.undo_changes(ch_id, user)
×
703
    # get op_id from change
UNCOV
704
    ch = Change.query.filter_by(id=ch_id).first()
×
UNCOV
705
    if result is True:
×
UNCOV
706
        sockio.sm.emit_file_change(ch.op_id)
×
UNCOV
707
    return str(result)
×
708

709

710
@APP.route("/creator_of_operation", methods=["GET"])
1✔
711
@verify_user
1✔
712
def get_creator_of_operation():
1✔
713
    op_id = request.args.get('op_id', request.form.get('op_id', None))
×
714
    u_id = g.user.id
×
UNCOV
715
    creator_name = fm.fetch_operation_creator(op_id, u_id)
×
UNCOV
716
    if creator_name is False:
×
UNCOV
717
        return jsonify({"success": False, "message": "You don't have access to this data"}), 403
×
UNCOV
718
    return jsonify({"success": True, "username": creator_name}), 200
×
719

720

721
@APP.route("/users_without_permission", methods=["GET"])
1✔
722
@verify_user
1✔
723
def get_users_without_permission():
1✔
724
    op_id = request.args.get('op_id', request.form.get('op_id', None))
1✔
725
    u_id = g.user.id
1✔
726
    users = fm.fetch_users_without_permission(int(op_id), u_id)
1✔
727
    if users is False:
1✔
UNCOV
728
        return jsonify({"success": False, "message": "You don't have access to this data"}), 403
×
729

730
    return jsonify({"success": True, "users": users}), 200
1✔
731

732

733
@APP.route("/users_with_permission", methods=["GET"])
1✔
734
@verify_user
1✔
735
def get_users_with_permission():
1✔
736
    op_id = request.args.get('op_id', request.form.get('op_id', None))
1✔
737
    u_id = g.user.id
1✔
738
    users = fm.fetch_users_with_permission(int(op_id), u_id)
1✔
739
    if users is False:
1✔
UNCOV
740
        return jsonify({"success": False, "message": "You don't have access to this data"}), 403
×
741

742
    return jsonify({"success": True, "users": users}), 200
1✔
743

744

745
@APP.route("/add_bulk_permissions", methods=["POST"])
1✔
746
@verify_user
1✔
747
def add_bulk_permissions():
1✔
748
    op_id = int(request.form.get('op_id'))
×
749
    new_u_ids = json.loads(request.form.get('selected_userids', []))
×
750
    access_level = request.form.get('selected_access_level')
×
751
    user = g.user
×
752
    success = fm.add_bulk_permission(op_id, user, new_u_ids, access_level)
×
753
    if success:
×
UNCOV
754
        for u_id in new_u_ids:
×
755
            sockio.sm.emit_new_permission(u_id, op_id)
×
UNCOV
756
        sockio.sm.emit_operation_permissions_updated(user.id, op_id)
×
UNCOV
757
        return jsonify({"success": True, "message": "Users successfully added!"})
×
758

UNCOV
759
    return jsonify({"success": False, "message": "Some error occurred. Please try again."})
×
760

761

762
@APP.route("/modify_bulk_permissions", methods=["POST"])
1✔
763
@verify_user
1✔
764
def modify_bulk_permissions():
1✔
765
    op_id = int(request.form.get('op_id'))
×
766
    u_ids = json.loads(request.form.get('selected_userids', []))
×
767
    new_access_level = request.form.get('selected_access_level')
×
768
    user = g.user
×
769
    success = fm.modify_bulk_permission(op_id, user, u_ids, new_access_level)
×
770
    if success:
×
UNCOV
771
        for u_id in u_ids:
×
772
            sockio.sm.emit_update_permission(u_id, op_id, access_level=new_access_level)
×
UNCOV
773
        sockio.sm.emit_operation_permissions_updated(user.id, op_id)
×
UNCOV
774
        return jsonify({"success": True, "message": "User permissions successfully updated!"})
×
775

UNCOV
776
    return jsonify({"success": False, "message": "Some error occurred. Please try again."})
×
777

778

779
@APP.route("/delete_bulk_permissions", methods=["POST"])
1✔
780
@verify_user
1✔
781
def delete_bulk_permissions():
1✔
782
    op_id = int(request.form.get('op_id'))
×
783
    u_ids = json.loads(request.form.get('selected_userids', []))
×
784
    user = g.user
×
785
    success = fm.delete_bulk_permission(op_id, user, u_ids)
×
786
    if success:
×
UNCOV
787
        for u_id in u_ids:
×
788
            sockio.sm.emit_revoke_permission(u_id, op_id)
×
UNCOV
789
        sockio.sm.emit_operation_permissions_updated(user.id, op_id)
×
UNCOV
790
        return jsonify({"success": True, "message": "User permissions successfully deleted!"})
×
791

UNCOV
792
    return jsonify({"success": False, "message": "Some error occurred. Please try again."})
×
793

794

795
@APP.route('/import_permissions', methods=['POST'])
1✔
796
@verify_user
1✔
797
def import_permissions():
1✔
798
    import_op_id = int(request.form.get('import_op_id'))
1✔
799
    current_op_id = int(request.form.get('current_op_id'))
1✔
800
    user = g.user
1✔
801
    success, users, message = fm.import_permissions(import_op_id, current_op_id, user.id)
1✔
802
    if success:
1✔
803
        for u_id in users["add_users"]:
1✔
804
            sockio.sm.emit_new_permission(u_id, current_op_id)
1✔
805
        for u_id in users["modify_users"]:
1✔
806
            # changes navigation for viewer/collaborator
UNCOV
807
            sockio.sm.emit_update_permission(u_id, current_op_id)
×
808
        for u_id in users["delete_users"]:
1✔
809
            # invalidate waypoint table, title of windows
UNCOV
810
            sockio.sm.emit_revoke_permission(u_id, current_op_id)
×
811

812
        token = request.args.get('token', request.form.get('token', False))
1✔
813
        json_config = {"token": token}
1✔
814
        sockio.sm.update_operation_list(json_config)
1✔
815

816
        sockio.sm.emit_operation_permissions_updated(user.id, current_op_id)
1✔
817
        return jsonify({"success": True})
1✔
818

UNCOV
819
    return jsonify({"success": False,
×
820
                    "message": message})
821

822

823
@APP.route('/reset_password/<token>', methods=['GET', 'POST'])
1✔
824
def reset_password(token):
1✔
825
    try:
×
826
        email = confirm_token(token, expiration=86400)
×
UNCOV
827
    except TypeError:
×
828
        return jsonify({"success": False}), 401
×
829
    if email is False:
×
830
        flash("Sorry, your token has expired or is invalid! We will need to resend your authentication email",
×
831
              'category_info')
832
        return render_template('user/status.html', uri={"path": "reset_request", "name": "Resend authentication email"})
×
833
    user = User.query.filter_by(emailid=email).first_or_404()
×
834
    form = ResetPasswordForm()
×
835
    if form.validate_on_submit():
×
836
        try:
×
837
            user.hash_password(form.confirm_password.data)
×
838
            fm.modify_user(user, "confirmed", True)
×
839
            flash('Password reset Success. Please login by the user interface.', 'category_success')
×
UNCOV
840
            return render_template('user/status.html')
×
UNCOV
841
        except IOError:
×
UNCOV
842
            flash('Password reset failed. Please try again later', 'category_danger')
×
UNCOV
843
    return render_template('user/reset_password.html', form=form)
×
844

845

846
@APP.route("/reset_request", methods=['GET', 'POST'])
1✔
847
def reset_request():
1✔
848
    if mscolab_settings.MAIL_ENABLED:
×
849
        form = ResetRequestForm()
×
850
        if form.validate_on_submit():
×
851
            # Check whether user exists or not based on the db
852
            user = User.query.filter_by(emailid=form.email.data).first()
×
853
            if user:
×
854
                try:
×
UNCOV
855
                    username = user.username
×
856
                    token = generate_confirmation_token(form.email.data)
×
857
                    reset_password_url = url_for('reset_password', token=token, _external=True)
×
858
                    html = render_template('user/reset_confirmation.html',
×
859
                                           reset_password_url=reset_password_url, username=username)
860
                    subject = "MSColab Password reset request"
×
861
                    send_email(form.email.data, subject, html)
×
UNCOV
862
                    flash('An email was sent if this user account exists', 'category_success')
×
UNCOV
863
                    return render_template('user/status.html')
×
864
                except IOError:
×
865
                    flash('''We apologize, but it seems that there was an issue sending
×
866
                    your request email. Please try again later.''', 'category_info')
867
            else:
868
                flash('An email was sent if this user account exists', 'category_success')
×
869
                return render_template('user/status.html')
×
UNCOV
870
        return render_template('user/reset_request.html', form=form)
×
871
    else:
UNCOV
872
        logging.warning("To send emails, the value of `MAIL_ENABLED` in `conf.py` should be set to True.")
×
UNCOV
873
        return render_template('errors/403.html'), 403
×
874

875

876
if mscolab_settings.USE_SAML2:
1✔
877
    # setup idp login config
878
    setup_saml2_backend()
×
879

880
    # set routes for SSO
UNCOV
881
    @APP.route('/available_idps/', methods=['GET'])
×
UNCOV
882
    def available_idps():
×
883
        """
884
        This function checks if IDP (Identity Provider) is enabled in the mscolab_settings module.
885
        If IDP is enabled, it retrieves the configured IDPs from setup_saml2_backend.CONFIGURED_IDPS
886
        and renders the 'idp/available_idps.html' template with the list of configured IDPs.
887
        """
888
        configured_idps = setup_saml2_backend.CONFIGURED_IDPS
×
UNCOV
889
        return render_template('idp/available_idps.html', configured_idps=configured_idps), 200
×
890

891
    @APP.route("/idp_login/", methods=['POST'])
×
892
    def idp_login():
×
893
        """Handle the login process for the user by selected IDP"""
894
        selected_idp = request.form.get('selectedIdentityProvider')
×
895
        sp_config = None
×
UNCOV
896
        for config in setup_saml2_backend.CONFIGURED_IDPS:
×
897
            if selected_idp == config['idp_identity_name']:
×
898
                sp_config = config['idp_data']['saml2client']
×
UNCOV
899
                break
×
900

901
        try:
×
902
            _, response_binding = sp_config.config.getattr("endpoints", "sp")[
×
903
                "assertion_consumer_service"
904
            ][0]
UNCOV
905
            entity_id = get_idp_entity_id(selected_idp)
×
906
            _, binding, http_args = sp_config.prepare_for_negotiated_authenticate(
×
907
                entityid=entity_id,
908
                response_binding=response_binding,
909
            )
910
            if binding == BINDING_HTTP_REDIRECT:
×
911
                headers = dict(http_args["headers"])
×
UNCOV
912
                return redirect(str(headers["Location"]), code=303)
×
913
            return Response(http_args["data"], headers=http_args["headers"])
×
UNCOV
914
        except (NameError, AttributeError):
×
UNCOV
915
            return render_template('errors/403.html'), 403
×
916

917
    def create_acs_post_handler(config):
×
918
        """
919
        Create acs_post_handler function for the given idp_config.
920
        """
921
        def acs_post_handler():
×
922
            """
923
            Function to handle SAML authentication response.
924
            """
UNCOV
925
            try:
×
UNCOV
926
                outstanding_queries = {}
×
927
                binding = BINDING_HTTP_POST
×
928
                authn_response = config['idp_data']['saml2client'].parse_authn_request_response(
×
929
                    request.form["SAMLResponse"], binding, outstanding=outstanding_queries
930
                )
931
                email = None
×
932
                username = None
×
933

934
                try:
×
935
                    email = authn_response.ava["email"][0]
×
UNCOV
936
                    username = authn_response.ava["givenName"][0]
×
937
                    token = generate_confirmation_token(email)
×
UNCOV
938
                except (NameError, AttributeError, KeyError):
×
UNCOV
939
                    try:
×
940
                        # Initialize an empty dictionary to store attribute values
941
                        attributes = {}
×
942

943
                        # Loop through attribute statements
UNCOV
944
                        for attribute_statement in authn_response.assertion.attribute_statement:
×
945
                            for attribute in attribute_statement.attribute:
×
UNCOV
946
                                attribute_name = attribute.name
×
UNCOV
947
                                attribute_value = \
×
948
                                    attribute.attribute_value[0].text if attribute.attribute_value else None
949
                                attributes[attribute_name] = attribute_value
×
950

951
                        # Extract the email and givenname attributes
952
                        email = attributes["email"]
×
UNCOV
953
                        username = attributes["givenName"]
×
954
                        token = generate_confirmation_token(email)
×
955
                    except (NameError, AttributeError, KeyError):
×
UNCOV
956
                        return render_template('errors/403.html'), 403
×
957

958
                if email is not None and username is not None:
×
959
                    idp_user_db_state = create_or_update_idp_user(email,
×
960
                                                                  username, token, idp_config['idp_identity_name'])
961
                    if idp_user_db_state:
×
962
                        return render_template('idp/idp_login_success.html', token=token), 200
×
963
                    return render_template('errors/500.html'), 500
×
UNCOV
964
                return render_template('errors/500.html'), 500
×
UNCOV
965
            except (NameError, AttributeError, KeyError):
×
966
                return render_template('errors/403.html'), 403
×
967
        return acs_post_handler
×
968

969
    # Implementation for handling configured SAML assertion consumer endpoints
970
    for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
×
UNCOV
971
        try:
×
972
            for assertion_consumer_endpoint in idp_config['idp_data']['assertion_consumer_endpoints']:
×
973
                # Dynamically add the route for the current endpoint
UNCOV
974
                APP.add_url_rule(f'/{assertion_consumer_endpoint}/', assertion_consumer_endpoint,
×
975
                                 create_acs_post_handler(idp_config), methods=['POST'])
976
        except (NameError, AttributeError, KeyError) as ex:
×
UNCOV
977
            logging.warning("USE_SAML2 is %s, Failure is: %s", mscolab_settings.USE_SAML2, ex)
×
978

979
    @APP.route('/idp_login_auth/', methods=['POST'])
×
980
    def idp_login_auth():
×
981
        """Handle the SAML authentication validation of client application."""
982
        try:
×
983
            data = request.get_json()
×
984
            token = data.get('token')
×
985
            email = confirm_token(token, expiration=1200)
×
986
            if email:
×
987
                user = check_login(email, token)
×
988
                if user:
×
UNCOV
989
                    random_token = secrets.token_hex(16)
×
UNCOV
990
                    user.hash_password(random_token)
×
UNCOV
991
                    fm.modify_user(user, action="update_idp_user")
×
UNCOV
992
                    return json.dumps({
×
993
                        "success": True,
994
                        'token': random_token,
995
                        'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid}
996
                    })
UNCOV
997
                return jsonify({"success": False}), 401
×
998
            return jsonify({"success": False}), 401
×
999
        except TypeError:
×
UNCOV
1000
            return jsonify({"success": False}), 401
×
1001

1002
    @APP.route("/metadata/<idp_identity_name>", methods=['GET'])
×
1003
    def metadata(idp_identity_name):
×
1004
        """Return the SAML metadata XML for the requested IDP"""
UNCOV
1005
        for config in setup_saml2_backend.CONFIGURED_IDPS:
×
UNCOV
1006
            if idp_identity_name == config['idp_identity_name']:
×
1007
                sp_config = config['idp_data']['saml2client']
×
1008
                metadata_string = create_metadata_string(
×
1009
                    None, sp_config.config, 4, None, None, None, None, None
1010
                ).decode("utf-8")
UNCOV
1011
                return Response(metadata_string, mimetype="text/xml")
×
1012
        return render_template('errors/404.html'), 404
×
1013

1014

1015
def start_server(app, sockio, cm, fm, port=8083):
1✔
UNCOV
1016
    create_files()
×
1017
    sockio.run(app, port=port)
×
1018

1019

1020
def main():
1✔
UNCOV
1021
    start_server(_app, sockio, cm, fm)
×
1022

1023

1024
# for wsgi
1025
application = socketio.WSGIApp(sockio)
1✔
1026

1027
if __name__ == '__main__':
1028
    main()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc