• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scoringengine / scoringengine / 23385248069

21 Mar 2026 05:50PM UTC coverage: 73.202% (-2.5%) from 75.69%
23385248069

push

github

RustyBower
Fix test to match DB fallback behavior for missing output files

The endpoint now returns check.output from DB (200) instead of 404
when the on-disk file doesn't exist.

3726 of 5090 relevant lines covered (73.2%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.13
/scoring_engine/web/views/api/admin.py
1
import html
1✔
2
import json
1✔
3
import os
1✔
4
import re
1✔
5
import time
1✔
6
from datetime import datetime, timezone
1✔
7

8
import pytz
1✔
9
from dateutil.parser import parse
1✔
10
from flask import flash, jsonify, redirect, request, url_for
1✔
11
from flask_login import current_user, login_required
1✔
12

13

14
def _ensure_utc_aware(dt):
1✔
15
    """Ensure datetime is timezone-aware in UTC. Handles both naive and aware datetimes."""
16
    if dt is None:
1✔
17
        return None
×
18
    if dt.tzinfo is None:
1✔
19
        # Naive datetime - assume UTC
20
        return pytz.utc.localize(dt)
1✔
21
    # Already aware - convert to UTC
22
    return dt.astimezone(pytz.utc)
×
23

24

25
from sqlalchemy.orm import joinedload
1✔
26
from sqlalchemy.orm.exc import NoResultFound
1✔
27
from sqlalchemy.sql import func
1✔
28

29
from scoring_engine.cache_helper import (
1✔
30
    update_all_cache,
31
    update_all_inject_data,
32
    update_inject_comments,
33
    update_inject_data,
34
    update_overview_data,
35
    update_scoreboard_data,
36
    update_service_data,
37
    update_services_data,
38
    update_services_navbar,
39
    update_team_stats,
40
)
41
from scoring_engine.celery_stats import CeleryStats
1✔
42
from scoring_engine.config import config
1✔
43
from scoring_engine.db import db
1✔
44
from scoring_engine.engine.basic_check import CHECK_FAILURE_TEXT, CHECK_SUCCESS_TEXT, CHECK_TIMED_OUT_TEXT
1✔
45
from scoring_engine.engine.engine import Engine
1✔
46
from scoring_engine.engine.execute_command import execute_command
1✔
47
from scoring_engine.models.check import Check
1✔
48
from scoring_engine.models.environment import Environment
1✔
49
from scoring_engine.models.inject import Inject, InjectComment, InjectRubricScore, RubricItem, Template
1✔
50
from scoring_engine.models.kb import KB
1✔
51
from scoring_engine.models.property import Property
1✔
52
from scoring_engine.models.round import Round
1✔
53
from scoring_engine.models.service import Service
1✔
54
from scoring_engine.models.setting import Setting
1✔
55
from scoring_engine.models.team import Team
1✔
56
from scoring_engine.models.user import User
1✔
57
from scoring_engine.models.welcome import get_welcome_config, save_welcome_config
1✔
58
from scoring_engine.notifications import notify_inject_graded, notify_revision_requested
1✔
59

60
from . import mod
1✔
61

62

63
@mod.route("/api/admin/update_environment_info", methods=["POST"])
1✔
64
@login_required
1✔
65
def admin_update_environment():
1✔
66
    if current_user.is_white_team:
1✔
67
        if "name" in request.form and "value" in request.form and "pk" in request.form:
1✔
68
            environment = db.session.get(Environment, int(request.form["pk"]))
1✔
69
            if environment:
1✔
70
                if request.form["name"] in ("matching_content", "matching_content_reject"):
1✔
71
                    value = html.escape(request.form["value"])
1✔
72
                    if value:
1✔
73
                        try:
1✔
74
                            re.compile(value)
1✔
75
                        except re.error as e:
1✔
76
                            return jsonify({"error": "Invalid regex pattern: " + str(e)}), 400
1✔
77
                    setattr(environment, request.form["name"], value or None)
1✔
78
                db.session.add(environment)
1✔
79
                db.session.commit()
1✔
80
                return jsonify({"status": "Updated Environment Information"})
1✔
81
            return jsonify({"error": "Incorrect permissions"})
1✔
82
    return jsonify({"error": "Incorrect permissions"})
1✔
83

84

85
@mod.route("/api/admin/update_property", methods=["POST"])
1✔
86
@login_required
1✔
87
def admin_update_property():
1✔
88
    if current_user.is_white_team:
1✔
89
        if "name" in request.form and "value" in request.form and "pk" in request.form:
1✔
90
            property_obj = db.session.get(Property, int(request.form["pk"]))
1✔
91
            if property_obj:
1✔
92
                if request.form["name"] == "property_name":
1✔
93
                    property_obj.name = html.escape(request.form["value"])
1✔
94
                elif request.form["name"] == "property_value":
1✔
95
                    property_obj.value = html.escape(request.form["value"])
1✔
96
                db.session.add(property_obj)
1✔
97
                db.session.commit()
1✔
98
                return jsonify({"status": "Updated Property Information"})
1✔
99
            return jsonify({"error": "Incorrect permissions"})
1✔
100
    return jsonify({"error": "Incorrect permissions"})
1✔
101

102

103
@mod.route("/api/admin/update_check", methods=["POST"])
1✔
104
@login_required
1✔
105
def admin_update_check():
1✔
106
    if current_user.is_white_team:
1✔
107
        if "name" in request.form and "value" in request.form and "pk" in request.form:
1✔
108
            check = db.session.get(Check, int(request.form["pk"]))
1✔
109
            if check:
1✔
110
                modified_check = False
1✔
111
                if request.form["name"] == "check_value":
1✔
112
                    if request.form["value"] == "1":
1✔
113
                        check.result = True
1✔
114
                    elif request.form["value"] == "2":
1✔
115
                        check.result = False
1✔
116
                    modified_check = True
1✔
117
                elif request.form["name"] == "check_reason":
1✔
118
                    modified_check = True
1✔
119
                    check.reason = request.form["value"]
1✔
120
                if modified_check:
1✔
121
                    db.session.add(check)
1✔
122
                    db.session.commit()
1✔
123
                    update_scoreboard_data()
1✔
124
                    update_overview_data()
1✔
125
                    update_services_navbar(check.service.team.id)
1✔
126
                    update_team_stats(check.service.team.id)
1✔
127
                    update_services_data(check.service.team.id)
1✔
128
                    update_service_data(check.service.id)
1✔
129
                    return jsonify({"status": "Updated Property Information"})
1✔
130
            return jsonify({"error": "Incorrect permissions"})
1✔
131
    return jsonify({"error": "Incorrect permissions"})
1✔
132

133

134
@mod.route("/api/admin/update_host", methods=["POST"])
1✔
135
@login_required
1✔
136
def admin_update_host():
1✔
137
    if current_user.is_white_team:
×
138
        if "name" in request.form and "value" in request.form and "pk" in request.form:
×
139
            service = db.session.get(Service, int(request.form["pk"]))
×
140
            if service:
×
141
                if request.form["name"] == "host":
×
142
                    service.host = html.escape(request.form["value"])
×
143
                    db.session.add(service)
×
144
                    db.session.commit()
×
145
                    update_overview_data()
×
146
                    update_services_data(service.team.id)
×
147
                    update_service_data(service.id)
×
148
                    return jsonify({"status": "Updated Service Information"})
×
149
    return jsonify({"error": "Incorrect permissions"})
×
150

151

152
@mod.route("/api/admin/update_port", methods=["POST"])
1✔
153
@login_required
1✔
154
def admin_update_port():
1✔
155
    if current_user.is_white_team:
×
156
        if "name" in request.form and "value" in request.form and "pk" in request.form:
×
157
            service = db.session.get(Service, int(request.form["pk"]))
×
158
            if service:
×
159
                if request.form["name"] == "port":
×
160
                    service.port = int(request.form["value"])
×
161
                    db.session.add(service)
×
162
                    db.session.commit()
×
163
                    update_overview_data()
×
164
                    update_services_data(service.team.id)
×
165
                    update_service_data(service.id)
×
166
                    return jsonify({"status": "Updated Service Information"})
×
167
    return jsonify({"error": "Incorrect permissions"})
×
168

169

170
@mod.route("/api/admin/update_worker_queue", methods=["POST"])
1✔
171
@login_required
1✔
172
def admin_update_worker_queue():
1✔
173
    if current_user.is_white_team:
×
174
        if "name" in request.form and "value" in request.form and "pk" in request.form:
×
175
            service = db.session.get(Service, int(request.form["pk"]))
×
176
            if service:
×
177
                if request.form["name"] == "worker_queue":
×
178
                    service.worker_queue = request.form["value"]
×
179
                    db.session.add(service)
×
180
                    db.session.commit()
×
181
                    return jsonify({"status": "Updated Service Information"})
×
182
    return jsonify({"error": "Incorrect permissions"})
×
183

184

185
@mod.route("/api/admin/update_points", methods=["POST"])
1✔
186
@login_required
1✔
187
def admin_update_points():
1✔
188
    if current_user.is_white_team:
×
189
        if "name" in request.form and "value" in request.form and "pk" in request.form:
×
190
            service = db.session.get(Service, int(request.form["pk"]))
×
191
            if service:
×
192
                if request.form["name"] == "points":
×
193
                    service.points = int(request.form["value"])
×
194
                    db.session.add(service)
×
195
                    db.session.commit()
×
196
                    return jsonify({"status": "Updated Service Information"})
×
197
    return jsonify({"error": "Incorrect permissions"})
×
198

199

200
@mod.route("/api/admin/update_about_page_content", methods=["POST"])
1✔
201
@login_required
1✔
202
def admin_update_about_page_content():
1✔
203
    if current_user.is_white_team:
×
204
        if "about_page_content" in request.form:
×
205
            setting = Setting.get_setting("about_page_content")
×
206
            setting.value = request.form["about_page_content"]
×
207
            db.session.add(setting)
×
208
            db.session.commit()
×
209
            Setting.clear_cache("about_page_content")
×
210
            flash("About Page Content Successfully Updated.", "success")
×
211
            return redirect(url_for("admin.settings"))
×
212
        flash("Error: about_page_content not specified.", "danger")
×
213
        return redirect(url_for("admin.manage"))
×
214
    return {"status": "Unauthorized"}, 403
×
215

216

217
@mod.route("/api/admin/update_welcome_page_content", methods=["POST"])
1✔
218
@login_required
1✔
219
def admin_update_welcome_page_content():
1✔
220
    if current_user.is_white_team:
×
221
        if "welcome_page_content" in request.form:
×
222
            setting = Setting.get_setting("welcome_page_content")
×
223
            setting.value = request.form["welcome_page_content"]
×
224
            db.session.add(setting)
×
225
            db.session.commit()
×
226
            Setting.clear_cache("welcome_page_content")
×
227
            flash("Welcome Page Content Successfully Updated.", "success")
×
228
            return redirect(url_for("admin.settings"))
×
229
        flash("Error: welcome_page_content not specified.", "danger")
×
230
        return redirect(url_for("admin.manage"))
×
231
    return {"status": "Unauthorized"}, 403
×
232

233

234
@mod.route("/api/admin/update_target_round_time", methods=["POST"])
1✔
235
@login_required
1✔
236
def admin_update_target_round_time():
1✔
237
    if current_user.is_white_team:
×
238
        if "target_round_time" in request.form:
×
239
            setting = Setting.get_setting("target_round_time")
×
240
            input_time = request.form["target_round_time"]
×
241
            if not input_time.isdigit():
×
242
                flash("Error: Target Round Time must be an integer.", "danger")
×
243
                return redirect(url_for("admin.settings"))
×
244
            setting.value = input_time
×
245
            db.session.add(setting)
×
246
            db.session.commit()
×
247
            Setting.clear_cache("target_round_time")
×
248
            flash("Target Round Time Successfully Updated.", "success")
×
249
            return redirect(url_for("admin.settings"))
×
250
        flash("Error: target_round_time not specified.", "danger")
×
251
        return redirect(url_for("admin.settings"))
×
252
    return {"status": "Unauthorized"}, 403
×
253

254

255
@mod.route("/api/admin/update_worker_refresh_time", methods=["POST"])
1✔
256
@login_required
1✔
257
def admin_update_worker_refresh_time():
1✔
258
    if current_user.is_white_team:
×
259
        if "worker_refresh_time" in request.form:
×
260
            setting = Setting.get_setting("worker_refresh_time")
×
261
            input_time = request.form["worker_refresh_time"]
×
262
            if not input_time.isdigit():
×
263
                flash("Error: Worker Refresh Time must be an integer.", "danger")
×
264
                return redirect(url_for("admin.settings"))
×
265
            setting.value = input_time
×
266
            db.session.add(setting)
×
267
            db.session.commit()
×
268
            Setting.clear_cache("worker_refresh_time")
×
269
            flash("Worker Refresh Time Successfully Updated.", "success")
×
270
            return redirect(url_for("admin.settings"))
×
271
        flash("Error: worker_refresh_time not specified.", "danger")
×
272
        return redirect(url_for("admin.settings"))
×
273
    return {"status": "Unauthorized"}, 403
×
274

275

276
@mod.route("/api/admin/update_blueteam_edit_hostname", methods=["POST"])
1✔
277
@login_required
1✔
278
def admin_update_blueteam_edit_hostname():
1✔
279
    if current_user.is_white_team:
×
280
        setting = Setting.get_setting("blue_team_update_hostname")
×
281
        if setting.value is True:
×
282
            setting.value = False
×
283
        else:
284
            setting.value = True
×
285
        db.session.add(setting)
×
286
        db.session.commit()
×
287
        Setting.clear_cache("blue_team_update_hostname")
×
288
        return redirect(url_for("admin.permissions"))
×
289
    return {"status": "Unauthorized"}, 403
×
290

291

292
@mod.route("/api/admin/update_blueteam_edit_port", methods=["POST"])
1✔
293
@login_required
1✔
294
def admin_update_blueteam_edit_port():
1✔
295
    if current_user.is_white_team:
×
296
        setting = Setting.get_setting("blue_team_update_port")
×
297
        if setting.value is True:
×
298
            setting.value = False
×
299
        else:
300
            setting.value = True
×
301
        db.session.add(setting)
×
302
        db.session.commit()
×
303
        Setting.clear_cache("blue_team_update_port")
×
304
        return redirect(url_for("admin.permissions"))
×
305
    return {"status": "Unauthorized"}, 403
×
306

307

308
@mod.route("/api/admin/update_blueteam_edit_account_usernames", methods=["POST"])
1✔
309
@login_required
1✔
310
def admin_update_blueteam_edit_account_usernames():
1✔
311
    if current_user.is_white_team:
×
312
        setting = Setting.get_setting("blue_team_update_account_usernames")
×
313
        if setting.value is True:
×
314
            setting.value = False
×
315
        else:
316
            setting.value = True
×
317
        db.session.add(setting)
×
318
        db.session.commit()
×
319
        Setting.clear_cache("blue_team_update_account_usernames")
×
320
        return redirect(url_for("admin.permissions"))
×
321
    return {"status": "Unauthorized"}, 403
×
322

323

324
@mod.route("/api/admin/update_blueteam_edit_account_passwords", methods=["POST"])
1✔
325
@login_required
1✔
326
def admin_update_blueteam_edit_account_passwords():
1✔
327
    if current_user.is_white_team:
×
328
        setting = Setting.get_setting("blue_team_update_account_passwords")
×
329
        if setting.value is True:
×
330
            setting.value = False
×
331
        else:
332
            setting.value = True
×
333
        db.session.add(setting)
×
334
        db.session.commit()
×
335
        Setting.clear_cache("blue_team_update_account_passwords")
×
336
        return redirect(url_for("admin.permissions"))
×
337
    return {"status": "Unauthorized"}, 403
×
338

339

340
@mod.route("/api/admin/update_blueteam_view_check_output", methods=["POST"])
1✔
341
@login_required
1✔
342
def admin_update_blueteam_view_check_output():
1✔
343
    if current_user.is_white_team:
×
344
        setting = Setting.get_setting("blue_team_view_check_output")
×
345
        if setting.value is True:
×
346
            setting.value = False
×
347
        else:
348
            setting.value = True
×
349
        db.session.add(setting)
×
350
        db.session.commit()
×
351
        Setting.clear_cache("blue_team_view_check_output")
×
352
        return redirect(url_for("admin.permissions"))
×
353
    return {"status": "Unauthorized"}, 403
×
354

355

356
@mod.route("/api/admin/update_anonymize_team_names", methods=["POST"])
1✔
357
@login_required
1✔
358
def admin_update_anonymize_team_names():
1✔
359
    if current_user.is_white_team:
×
360
        setting = Setting.get_setting("anonymize_team_names")
×
361
        if setting is None:
×
362
            # Create the setting if it doesn't exist
363
            setting = Setting(name="anonymize_team_names", value=True)
×
364
        elif setting.value is True:
×
365
            setting.value = False
×
366
        else:
367
            setting.value = True
×
368
        db.session.add(setting)
×
369
        db.session.commit()
×
370
        Setting.clear_cache("anonymize_team_names")
×
371
        return redirect(url_for("admin.permissions"))
×
372
    return {"status": "Unauthorized"}, 403
×
373

374

375
@mod.route("/api/admin/check/<int:check_id>/full_output")
1✔
376
@login_required
1✔
377
def admin_get_check_full_output(check_id):
1✔
378
    if current_user.is_white_team:
1✔
379
        check = db.session.get(Check, check_id)
1✔
380
        if not check:
1✔
381
            return jsonify({"error": "Check not found"}), 404
1✔
382

383
        team_name = check.service.team.name
1✔
384
        service_name = check.service.name
1✔
385
        round_num = check.round.number
1✔
386

387
        # Try disk file first, fall back to DB output
388
        output_path = os.path.join(
1✔
389
            config.check_output_folder,
390
            team_name,
391
            service_name,
392
            f"round_{round_num}.txt",
393
        )
394

395
        if os.path.isfile(output_path):
1✔
396
            with open(output_path, "r") as f:
1✔
397
                content = f.read()
1✔
398
        else:
399
            content = check.output or ""
1✔
400

401
        return content, 200, {"Content-Type": "text/plain; charset=utf-8"}
1✔
402
    return {"status": "Unauthorized"}, 403
1✔
403

404

405
@mod.route("/api/admin/get_round_progress")
1✔
406
@login_required
1✔
407
def get_check_progress_total():
1✔
408
    if current_user.is_white_team:
×
409
        task_id_settings = db.session.query(KB).filter_by(name="task_ids").order_by(KB.round_num.desc()).first()
×
410
        total_stats = {}
×
411
        total_stats["finished"] = 0
×
412
        total_stats["pending"] = 0
×
413

414
        team_stats = {}
×
415
        if task_id_settings:
×
416
            task_dict = json.loads(task_id_settings.value)
×
417
            for team_name, task_ids in task_dict.items():
×
418
                for task_id in task_ids:
×
419
                    task = execute_command.AsyncResult(task_id)
×
420
                    if team_name not in team_stats:
×
421
                        team_stats[team_name] = {}
×
422
                        team_stats[team_name]["pending"] = 0
×
423
                        team_stats[team_name]["finished"] = 0
×
424

425
                    if task.state == "PENDING":
×
426
                        team_stats[team_name]["pending"] += 1
×
427
                        total_stats["pending"] += 1
×
428
                    else:
429
                        team_stats[team_name]["finished"] += 1
×
430
                        total_stats["finished"] += 1
×
431

432
        total_percentage = 0
×
433
        total_tasks = total_stats["finished"] + total_stats["pending"]
×
434
        if total_stats["finished"] == 0:
×
435
            total_percentage = 0
×
436
        elif total_tasks == 0:
×
437
            total_percentage = 100
×
438
        elif total_stats and total_stats["finished"]:
×
439
            total_percentage = int((total_stats["finished"] / total_tasks) * 100)
×
440

441
        output_dict = {"Total": total_percentage}
×
442
        for team_name, team_stat in team_stats.items():
×
443
            team_total_percentage = 0
×
444
            team_total_tasks = team_stat["finished"] + team_stat["pending"]
×
445
            if team_stat["finished"] == 0:
×
446
                team_total_percentage = 0
×
447
            elif team_total_tasks == 0:
×
448
                team_total_percentage = 100
×
449
            elif team_stat and team_stat["finished"]:
×
450
                team_total_percentage = int((team_stat["finished"] / team_total_tasks) * 100)
×
451
            output_dict[team_name] = team_total_percentage
×
452

453
        return json.dumps(output_dict)
×
454
    else:
455
        return {"status": "Unauthorized"}, 403
×
456

457

458
@mod.route("/api/admin/inject/<inject_id>/reopen", methods=["POST"])
1✔
459
@login_required
1✔
460
def admin_post_inject_reopen(inject_id):
1✔
461
    if current_user.is_white_team:
1✔
462
        inject = db.session.get(Inject, inject_id)
1✔
463
        if not inject:
1✔
464
            return jsonify({"status": "Invalid Inject ID"}), 400
1✔
465
        if inject.status != "Graded":
1✔
466
            return jsonify({"status": "Inject is not in Graded state"}), 400
1✔
467

468
        inject.status = "Submitted"
1✔
469
        inject.graded = None
1✔
470
        comment = InjectComment(
1✔
471
            f"Inject reopened for re-grading by {current_user.username}.",
472
            current_user,
473
            inject,
474
        )
475
        db.session.add(comment)
1✔
476
        db.session.commit()
1✔
477
        update_inject_data(inject_id)
1✔
478
        update_inject_comments(inject_id)
1✔
479
        update_scoreboard_data()
1✔
480
        return jsonify({"status": "Success"}), 200
1✔
481
    else:
482
        return {"status": "Unauthorized"}, 403
1✔
483

484

485
@mod.route("/api/admin/injects/templates/<template_id>")
1✔
486
@login_required
1✔
487
def admin_get_inject_templates_id(template_id):
1✔
488
    if current_user.is_white_team:
1✔
489
        template = db.session.get(Template, int(template_id))
1✔
490
        data = dict(
1✔
491
            id=template.id,
492
            title=template.title,
493
            scenario=template.scenario,
494
            deliverable=template.deliverable,
495
            max_score=template.max_score,
496
            start_time=_ensure_utc_aware(template.start_time).astimezone(pytz.timezone(config.timezone)).isoformat(),
497
            end_time=_ensure_utc_aware(template.end_time).astimezone(pytz.timezone(config.timezone)).isoformat(),
498
            enabled=template.enabled,
499
            rubric_items=[
500
                {"id": x.id, "title": x.title, "description": x.description, "points": x.points, "order": x.order}
501
                for x in template.rubric_items
502
            ],
503
            teams=[inject.team.name for inject in template.injects if inject.enabled and inject.team],
504
        )
505
        return jsonify(data)
1✔
506
    else:
507
        return jsonify({"status": "Unauthorized"}), 403
×
508

509

510
@mod.route("/api/admin/injects/templates/<template_id>", methods=["PUT"])
1✔
511
@login_required
1✔
512
def admin_put_inject_templates_id(template_id):
1✔
513
    if current_user.is_white_team:
×
514
        data = request.get_json()
×
515
        template = db.session.get(Template, int(template_id))
×
516
        if template:
×
517
            if data.get("title"):
×
518
                template.title = data["title"]
×
519
            if data.get("scenario"):
×
520
                template.scenario = data["scenario"]
×
521
            if data.get("deliverable"):
×
522
                template.deliverable = data["deliverable"]
×
523
            if data.get("start_time"):
×
524
                template.start_time = parse(data["start_time"]).astimezone(pytz.utc).replace(tzinfo=None)
×
525
            if data.get("end_time"):
×
526
                template.end_time = parse(data["end_time"]).astimezone(pytz.utc).replace(tzinfo=None)
×
527
            # TODO - Fix this to not be string values from javascript select
528
            if data.get("status") == "Enabled":
×
529
                template.enabled = True
×
530
            elif data.get("status") == "Disabled":
×
531
                template.enabled = False
×
532
            if "rubric_items" in data:
×
533
                # Update existing rubric items in place to preserve scores
534
                existing_by_id = {item.id: item for item in template.rubric_items}
×
535
                incoming_ids = set()
×
536
                for idx, ri in enumerate(data["rubric_items"]):
×
537
                    ri_id = ri.get("id")
×
538
                    if ri_id and ri_id in existing_by_id:
×
539
                        # Update existing item
540
                        item = existing_by_id[ri_id]
×
541
                        item.title = ri["title"]
×
542
                        item.points = ri["points"]
×
543
                        item.description = ri.get("description")
×
544
                        item.order = ri.get("order", idx)
×
545
                        incoming_ids.add(ri_id)
×
546
                    else:
547
                        # Create new item
548
                        rubric_item = RubricItem(
×
549
                            title=ri["title"],
550
                            points=ri["points"],
551
                            template=template,
552
                            description=ri.get("description"),
553
                            order=ri.get("order", idx),
554
                        )
555
                        db.session.add(rubric_item)
×
556
                # Delete only items that were removed from the payload
557
                for item_id, item in existing_by_id.items():
×
558
                    if item_id not in incoming_ids:
×
559
                        db.session.delete(item)
×
560
            if data.get("selectedTeams"):
×
561
                for team_name in data["selectedTeams"]:
×
562
                    inject = (
×
563
                        db.session.query(Inject)
564
                        .join(Template)
565
                        .join(Team)
566
                        .filter(Team.name == team_name)
567
                        .filter(Template.id == template_id)
568
                        .one_or_none()
569
                    )
570
                    # Update inject if it exists
571
                    if inject:
×
572
                        inject.enabled = True
×
573
                    # Otherwise, create the inject
574
                    else:
575
                        team = db.session.query(Team).filter(Team.name == team_name).first()
×
576
                        inject = Inject(
×
577
                            team=team,
578
                            template=template,
579
                        )
580
                        db.session.add(inject)
×
581
            if data.get("unselectedTeams"):
×
582
                injects = (
×
583
                    db.session.query(Inject)
584
                    .join(Template)
585
                    .join(Team)
586
                    .filter(Team.name.in_(data["unselectedTeams"]))
587
                    .filter(Template.id == template_id)
588
                    .all()
589
                )
590
                for inject in injects:
×
591
                    inject.enabled = False
×
592
            db.session.commit()
×
593
            return jsonify({"status": "Success"}), 200
×
594
        else:
595
            return jsonify({"status": "Error", "message": "Template not found"}), 400
×
596

597
    else:
598
        return jsonify({"status": "Unauthorized"}), 403
×
599

600

601
@mod.route("/api/admin/injects/templates/<template_id>", methods=["DELETE"])
1✔
602
@login_required
1✔
603
def admin_delete_inject_templates_id(template_id):
1✔
604
    if current_user.is_white_team:
×
605
        template = db.session.get(Template, int(template_id))
×
606
        if template:
×
607
            db.session.delete(template)
×
608
            db.session.commit()
×
609
            return jsonify({"status": "Success"}), 200
×
610
        else:
611
            return jsonify({"status": "Error", "message": "Template not found"}), 400
×
612
    else:
613
        return jsonify({"status": "Unauthorized"}), 403
×
614

615

616
@mod.route("/api/admin/injects/templates")
1✔
617
@login_required
1✔
618
def admin_get_inject_templates():
1✔
619
    if current_user.is_white_team:
×
620
        data = list()
×
621
        templates = db.session.query(Template).options(joinedload(Template.injects)).all()
×
622
        for template in templates:
×
623
            data.append(
×
624
                dict(
625
                    id=template.id,
626
                    title=template.title,
627
                    scenario=template.scenario,
628
                    deliverable=template.deliverable,
629
                    max_score=template.max_score,
630
                    start_time=template.start_time.astimezone(pytz.timezone(config.timezone)).isoformat(),
631
                    end_time=template.end_time.astimezone(pytz.timezone(config.timezone)).isoformat(),
632
                    enabled=template.enabled,
633
                    rubric_items=[
634
                        {
635
                            "id": x.id,
636
                            "title": x.title,
637
                            "description": x.description,
638
                            "points": x.points,
639
                            "order": x.order,
640
                        }
641
                        for x in template.rubric_items
642
                    ],
643
                    teams=[inject.team.name for inject in template.injects if inject if inject.enabled if inject.team],
644
                )
645
            )
646
        return jsonify(data=data)
×
647
    else:
648
        return {"status": "Unauthorized"}, 403
×
649

650

651
@mod.route("/api/admin/inject/<inject_id>/grade", methods=["POST"])
1✔
652
@login_required
1✔
653
def admin_post_inject_grade(inject_id):
1✔
654
    if current_user.is_white_team:
1✔
655
        data = request.get_json()
1✔
656
        if "rubric_scores" not in data or not data["rubric_scores"]:
1✔
657
            return jsonify({"status": "Invalid Score Provided"}), 400
×
658
        inject = db.session.get(Inject, inject_id)
1✔
659
        if not inject:
1✔
660
            return jsonify({"status": "Invalid Inject ID"}), 400
×
661

662
        # Validate rubric items belong to the template
663
        template_item_ids = {item.id for item in inject.template.rubric_items}
1✔
664
        template_item_max = {item.id: item.points for item in inject.template.rubric_items}
1✔
665
        for rs in data["rubric_scores"]:
1✔
666
            if rs["rubric_item_id"] not in template_item_ids:
1✔
667
                return jsonify({"status": "Invalid rubric item ID"}), 400
×
668
            if rs["score"] > template_item_max[rs["rubric_item_id"]]:
1✔
669
                return jsonify({"status": "Score exceeds rubric item max"}), 400
×
670

671
        # Delete existing scores and create new ones
672
        for old_score in list(inject.rubric_scores):
1✔
673
            db.session.delete(old_score)
×
674
        db.session.flush()
1✔
675

676
        for rs in data["rubric_scores"]:
1✔
677
            rubric_item = db.session.get(RubricItem, rs["rubric_item_id"])
1✔
678
            score_obj = InjectRubricScore(
1✔
679
                score=rs["score"],
680
                inject=inject,
681
                rubric_item=rubric_item,
682
                grader=current_user,
683
            )
684
            db.session.add(score_obj)
1✔
685

686
        inject.graded = datetime.now(timezone.utc)
1✔
687
        inject.status = "Graded"
1✔
688
        score = sum(rs["score"] for rs in data["rubric_scores"])
1✔
689
        comment = InjectComment(
1✔
690
            f"Inject graded by {current_user.username}. Score: {score}/{inject.template.max_score}.",
691
            current_user,
692
            inject,
693
        )
694
        db.session.add(comment)
1✔
695
        db.session.commit()
1✔
696
        update_inject_data(inject_id)
1✔
697
        update_inject_comments(inject_id)
1✔
698
        update_scoreboard_data()
1✔
699
        notify_inject_graded(inject)
1✔
700
        return jsonify({"status": "Success"}), 200
1✔
701
    else:
702
        return {"status": "Unauthorized"}, 403
×
703

704

705
@mod.route("/api/admin/inject/<inject_id>/request-revision", methods=["POST"])
1✔
706
@login_required
1✔
707
def admin_post_inject_request_revision(inject_id):
1✔
708
    if current_user.is_white_team:
×
709
        inject = db.session.get(Inject, inject_id)
×
710
        if not inject:
×
711
            return jsonify({"status": "Invalid Inject ID"}), 400
×
712
        if inject.status not in ("Submitted", "Resubmitted"):
×
713
            return jsonify({"status": "Inject is not in a submittable state"}), 400
×
714

715
        data = request.get_json() or {}
×
716
        reason = data.get("reason", "Revision requested by grader.")
×
717

718
        inject.status = "Revision Requested"
×
719
        # Create a system comment with the reason
720
        comment = InjectComment(content=reason, user=current_user, inject=inject)
×
721
        db.session.add(comment)
×
722
        db.session.commit()
×
723
        update_inject_data(inject_id)
×
724
        update_inject_comments(inject_id)
×
725
        notify_revision_requested(inject)
×
726
        return jsonify({"status": "Success"}), 200
×
727
    else:
728
        return {"status": "Unauthorized"}, 403
×
729

730

731
@mod.route("/api/admin/injects/template/<int:template_id>/submissions")
1✔
732
@login_required
1✔
733
def admin_get_template_submissions(template_id):
1✔
734
    if current_user.is_white_team:
×
735
        template = db.session.get(Template, template_id)
×
736
        if not template:
×
737
            return jsonify({"status": "Template not found"}), 404
×
738

739
        submissions = []
×
740
        for inject in template.injects:
×
741
            if not inject.team or not inject.enabled:
×
742
                continue
×
743
            submissions.append(
×
744
                {
745
                    "id": inject.id,
746
                    "team": inject.team.name,
747
                    "status": inject.status,
748
                    "score": inject.score,
749
                    "max_score": template.max_score,
750
                    "submitted": inject.submitted.isoformat() if inject.submitted else None,
751
                    "graded": inject.graded.isoformat() if inject.graded else None,
752
                }
753
            )
754
        return jsonify(data=submissions), 200
×
755
    else:
756
        return {"status": "Unauthorized"}, 403
×
757

758

759
@mod.route("/api/admin/injects/templates", methods=["POST"])
1✔
760
@login_required
1✔
761
def admin_post_inject_templates():
1✔
762
    if current_user.is_white_team:
×
763
        data = request.get_json()
×
764
        if (
×
765
            data.get("title")
766
            and data.get("scenario")
767
            and data.get("deliverable")
768
            and data.get("start_time")
769
            and data.get("end_time")
770
        ):
771
            template = Template(
×
772
                title=data["title"],
773
                scenario=data["scenario"],
774
                deliverable=data["deliverable"],
775
                start_time=parse(data["start_time"]).astimezone(pytz.utc).replace(tzinfo=None),
776
                end_time=parse(data["end_time"]).astimezone(pytz.utc).replace(tzinfo=None),
777
            )
778
            db.session.add(template)
×
779
            db.session.flush()
×
780
            # Create rubric items
781
            if data.get("rubric_items"):
×
782
                for i, item_data in enumerate(data["rubric_items"]):
×
783
                    rubric_item = RubricItem(
×
784
                        title=item_data["title"],
785
                        description=item_data.get("description", ""),
786
                        points=item_data["points"],
787
                        order=item_data.get("order", i),
788
                        template=template,
789
                    )
790
                    db.session.add(rubric_item)
×
791
            db.session.commit()
×
792
            # TODO - Fix this to not be string values from javascript select
793
            if data.get("status") == "Enabled":
×
794
                template.enabled = True
×
795
            elif data.get("status") == "Disabled":
×
796
                template.enabled = False
×
797
            if data.get("selectedTeams"):
×
798
                for team_name in data["selectedTeams"]:
×
799
                    inject = (
×
800
                        db.session.query(Inject)
801
                        .join(Template)
802
                        .join(Team)
803
                        .filter(Team.name == team_name)
804
                        .filter(Template.id == template.id)
805
                        .one_or_none()
806
                    )
807
                    # Update inject if it exists
808
                    if inject:
×
809
                        inject.enabled = True
×
810
                    # Otherwise, create the inject
811
                    else:
812
                        team = db.session.query(Team).filter(Team.name == team_name).first()
×
813
                        inject = Inject(
×
814
                            team=team,
815
                            template=template,
816
                        )
817
                        db.session.add(inject)
×
818
            if data.get("unselectedTeams"):
×
819
                injects = (
×
820
                    db.session.query(Inject)
821
                    .join(Template)
822
                    .join(Team)
823
                    .filter(Team.name.in_(data["unselectedTeams"]))
824
                    .filter(Template.id == template.id)
825
                    .all()
826
                )
827
                for inject in injects:
×
828
                    inject.enabled = False
×
829
            db.session.commit()
×
830
            return jsonify({"status": "Success"}), 200
×
831
        else:
832
            return jsonify({"status": "Error", "message": "Missing Data"}), 400
×
833
    else:
834
        return {"status": "Unauthorized"}, 403
×
835

836

837
# @mod.route("/api/admin/injects/templates/export")
838
# @login_required
839
# def admin_export_inject_templates():
840
#     if current_user.is_white_team:
841
#         data = []
842
#         templates = db.session.query(Template).all()
843
#         for template in templates:
844
#             data.append(
845
#                 dict(
846
#                     id=template.id,
847
#                     title=template.title,
848
#                     scenario=template.scenario,
849
#                     deliverable=template.deliverable,
850
#                     start_time=template.start_time,
851
#                     end_time=template.end_time,
852
#                     enabled=template.enabled,
853
#                     rubric=[
854
#                         {"id": x.id, "value": x.value, "deliverable": x.deliverable}
855
#                         for x in template.rubric
856
#                     ],
857
#                     # TODO - export teams
858
#                 )
859
#             )
860
#         return jsonify(data=data)
861
#     else:
862
#         return {"status": "Unauthorized"}, 403
863

864

865
# TODO - Generate injects from templates
866
@mod.route("/api/admin/injects/templates/import", methods=["POST"])
1✔
867
@login_required
1✔
868
def admin_import_inject_templates():
1✔
869
    if current_user.is_white_team:
1✔
870
        data = request.get_json()
1✔
871
        if data:
1✔
872
            for d in data:
1✔
873
                if d.get("id"):
1✔
874
                    template_id = d["id"]
1✔
875
                    t = db.session.get(Template, int(template_id))
1✔
876
                    # Update template if it exists
877
                    if t:
1✔
878
                        if d.get("title"):
1✔
879
                            t.title = d["title"]
1✔
880
                        if d.get("scenario"):
1✔
881
                            t.scenario = d["scenario"]
1✔
882
                        if d.get("deliverable"):
1✔
883
                            t.deliverable = d["deliverable"]
1✔
884
                        if d.get("start_time"):
1✔
885
                            t.start_time = parse(d["start_time"]).astimezone(pytz.utc).replace(tzinfo=None)
1✔
886
                        if d.get("end_time"):
1✔
887
                            t.end_time = parse(d["end_time"]).astimezone(pytz.utc).replace(tzinfo=None)
1✔
888
                        if d.get("enabled"):
1✔
889
                            t.enabled = True
1✔
890
                        else:
891
                            t.enabled = False
×
892
                        # for rubric in d["rubric"]:
893
                        #     if rubric.get("id"):
894
                        #         rubric_id = rubric["id"]
895
                        #     r = db.session.get(Rubric, int(rubric_id))
896
                        #     # Update rubric if it exists
897
                        #     if r:
898
                        #         if rubric.get("value"):
899
                        #             r.value = rubric["value"]
900
                        #         if rubric.get("deliverable"):
901
                        #             r.deliverable = rubric["deliverable"]
902
                        #     # Otherwise, create the rubric
903
                        #     else:
904
                        #         r = Rubric(
905
                        #             value=rubric["value"],
906
                        #             deliverable=rubric["deliverable"],
907
                        #             template=template,
908
                        #         )
909
                        #         db.session.add(r)
910
                        # Generate injects from template
911
                        if d.get("selectedTeams"):
1✔
912
                            for team_name in data["selectedTeams"]:
×
913
                                inject = (
×
914
                                    db.session.query(Inject)
915
                                    .join(Template)
916
                                    .join(Team)
917
                                    .filter(Team.name == team_name)
918
                                    .filter(Template.id == template_id)
919
                                    .one_or_none()
920
                                )
921
                                # Update inject if it exists
922
                                if inject:
×
923
                                    inject.enabled = True
×
924
                                # Otherwise, create the inject
925
                                else:
926
                                    team = db.session.query(Team).filter(Team.name == team_name).first()
×
927
                                    inject = Inject(
×
928
                                        team=team,
929
                                        template=template,
930
                                    )
931
                                    db.session.add(inject)
×
932
                        if d.get("unselectedTeams"):
1✔
933
                            injects = (
×
934
                                db.session.query(Inject)
935
                                .join(Template)
936
                                .join(Team)
937
                                .filter(Team.name.in_(data["unselectedTeams"]))
938
                                .filter(Template.id == template_id)
939
                                .all()
940
                            )
941
                            for inject in injects:
×
942
                                inject.enabled = False
×
943

944
                    else:
945
                        # Template ID doesn't exist, fall through to create a new one
946
                        d.pop("id")
1✔
947
                if not d.get("id"):
1✔
948
                    # Create the template
949
                    t = Template(
1✔
950
                        title=d["title"],
951
                        scenario=d["scenario"],
952
                        deliverable=d["deliverable"],
953
                        start_time=parse(d["start_time"]).astimezone(pytz.utc).replace(tzinfo=None),
954
                        end_time=parse(d["end_time"]).astimezone(pytz.utc).replace(tzinfo=None),
955
                        enabled=d["enabled"],
956
                    )
957
                    db.session.add(t)
1✔
958
                    db.session.flush()
1✔
959
                    # Create rubric items (backward compat: if only "score" exists, create default item)
960
                    if d.get("rubric_items"):
1✔
961
                        for idx, item_data in enumerate(d["rubric_items"]):
1✔
962
                            rubric_item = RubricItem(
1✔
963
                                title=item_data["title"],
964
                                description=item_data.get("description", ""),
965
                                points=item_data["points"],
966
                                order=item_data.get("order", idx),
967
                                template=t,
968
                            )
969
                            db.session.add(rubric_item)
1✔
970
                    elif d.get("score"):
×
971
                        rubric_item = RubricItem(
×
972
                            title="Overall Quality",
973
                            description="",
974
                            points=int(d["score"]),
975
                            order=0,
976
                            template=t,
977
                        )
978
                        db.session.add(rubric_item)
×
979
                    for team_name in d["teams"]:
1✔
980
                        inject = (
1✔
981
                            db.session.query(Inject)
982
                            .join(Template)
983
                            .join(Team)
984
                            .filter(Team.name == team_name)
985
                            .filter(Template.id == t.id)
986
                            .one_or_none()
987
                        )
988
                        # Update inject if it exists
989
                        if inject:
1✔
990
                            inject.enabled = True
×
991
                        # Otherwise, create the inject
992
                        else:
993
                            team = db.session.query(Team).filter(Team.name == team_name).first()
1✔
994
                            if team:
1✔
995
                                inject = Inject(
1✔
996
                                    team=team,
997
                                    template=t,
998
                                )
999
                                db.session.add(inject)
1✔
1000
            db.session.commit()
1✔
1001
            return jsonify({"status": "Success"}), 200
1✔
1002
        else:
1003
            return jsonify({"status": "Error", "message": "Invalid Data"}), 400
1✔
1004
    else:
1005
        return {"status": "Unauthorized"}, 403
1✔
1006

1007

1008
# TODO - This is way too many database queries
1009
@mod.route("/api/admin/injects/scores")
1✔
1010
@login_required
1✔
1011
def admin_inject_scores():
1✔
1012
    if current_user.is_white_team:
1✔
1013
        data = {}
1✔
1014

1015
        injects = (
1✔
1016
            db.session.query(Inject)
1017
            .options(joinedload(Inject.template), joinedload(Inject.team))
1018
            .order_by(Inject.template_id)
1019
            .order_by(Inject.team_id)
1020
            .all()
1021
        )
1022

1023
        for inject in injects:
1✔
1024
            if inject.team is None:
1✔
1025
                continue
1✔
1026
            if inject.template.id not in data:
1✔
1027
                data[inject.template.id] = {
1✔
1028
                    "title": inject.template.title,
1029
                    "end_time": inject.template.end_time,
1030
                }
1031
            if inject.team.name not in data[inject.template.id]:
1✔
1032
                data[inject.template.id][inject.team.name] = {
1✔
1033
                    "id": inject.id,
1034
                    "score": inject.score,
1035
                    "status": inject.status,
1036
                    "max_score": inject.template.max_score,
1037
                }
1038

1039
        # Rewrite data to be in the format expected by the frontend
1040
        score_data = []
1✔
1041
        for x in data.items():
1✔
1042
            score_data.append(x[1])
1✔
1043

1044
        return jsonify(data=score_data), 200
1✔
1045
    else:
1046
        return {"status": "Unauthorized"}, 403
×
1047

1048

1049
@mod.route("/api/admin/injects/get_bar_chart")
1✔
1050
@login_required
1✔
1051
def admin_injects_bar():
1✔
1052
    if current_user.is_white_team:
×
1053
        inject_scores = dict(
×
1054
            db.session.query(Inject.team_id, func.sum(InjectRubricScore.score))
1055
            .join(InjectRubricScore)
1056
            .filter(Inject.status == "Graded")
1057
            .group_by(Inject.team_id)
1058
            .all()
1059
        )
1060

1061
        team_data = {}
×
1062
        team_labels = []
×
1063
        team_inject_scores = []
×
1064
        blue_teams = db.session.query(Team).filter(Team.color == "Blue").order_by(Team.name).all()
×
1065
        for blue_team in blue_teams:
×
1066
            team_labels.append(blue_team.name)
×
1067
            team_inject_scores.append(str(inject_scores.get(blue_team.id, 0)))
×
1068

1069
        team_data["labels"] = team_labels
×
1070
        team_data["inject_scores"] = team_inject_scores
×
1071

1072
        return jsonify(team_data), 200
×
1073
    else:
1074
        return {"status": "Unauthorized"}, 403
×
1075

1076

1077
@mod.route("/api/admin/admin_update_template", methods=["POST"])
1✔
1078
@login_required
1✔
1079
def admin_update_template():
1✔
1080
    if current_user.is_white_team:
×
1081
        if "name" in request.form and "value" in request.form and "pk" in request.form:
×
1082
            template = db.session.get(Template, int(request.form["pk"]))
×
1083
            if template:
×
1084
                modified_check = False
×
1085
                if request.form["name"] == "template_state":
×
1086
                    template.state = request.form["value"]
×
1087
                    modified_check = True
×
1088
                elif request.form["name"] == "template_points":
×
1089
                    template.points = request.form["value"]
×
1090
                    modified_check = True
×
1091
                if modified_check:
×
1092
                    db.session.add(template)
×
1093
                    db.session.commit()
×
1094
                    # update_scoreboard_data()
1095
                    # update_overview_data()
1096
                    # update_services_navbar(check.service.team.id)
1097
                    # update_team_stats(check.service.team.id)
1098
                    # update_services_data(check.service.team.id)
1099
                    # update_service_data(check.service.id)
1100
                    return jsonify({"status": "Updated Property Information"})
×
1101
            return jsonify({"error": "Template Not Found"})
×
1102
    return jsonify({"error": "Incorrect permissions"})
×
1103

1104

1105
# @mod.route("/api/admin/injects/team/<team_id>")
1106
# @login_required
1107
# def admin_get_team_injects(team_id):
1108
#     if current_user.is_white_team:
1109
#         injects = db.session.query(Inject).filter(team_id == team_id).all()
1110
#         return jsonify(data=injects)
1111
#     else:
1112
#         return {"status": "Unauthorized"}, 403
1113

1114

1115
@mod.route("/api/admin/get_teams")
1✔
1116
@login_required
1✔
1117
def admin_get_teams():
1✔
1118
    if current_user.is_white_team:
×
1119
        all_teams = db.session.query(Team).all()
×
1120
        data = []
×
1121
        for team in all_teams:
×
1122
            users = {}
×
1123
            for user in team.users:
×
1124
                users[user.username] = [user.password, str(user.authenticated).title()]
×
1125
            data.append({"name": team.name, "color": team.color, "users": users})
×
1126
        return jsonify(data=data)
×
1127
    else:
1128
        return {"status": "Unauthorized"}, 403
×
1129

1130

1131
@mod.route("/api/admin/update_password", methods=["POST"])
1✔
1132
@login_required
1✔
1133
def admin_update_password():
1✔
1134
    if current_user.is_white_team:
×
1135
        if "user_id" in request.form and "password" in request.form:
×
1136
            try:
×
1137
                user_obj = db.session.query(User).filter(User.id == request.form["user_id"]).one()
×
1138
            except NoResultFound:
×
1139
                return redirect(url_for("auth.login"))
×
1140
            user_obj.update_password(html.escape(request.form["password"]))
×
1141
            user_obj.authenticated = False
×
1142
            db.session.add(user_obj)
×
1143
            db.session.commit()
×
1144
            flash("Password Successfully Updated.", "success")
×
1145
            return redirect(url_for("admin.manage"))
×
1146
        else:
1147
            flash("Error: user_id or password not specified.", "danger")
×
1148
            return redirect(url_for("admin.manage"))
×
1149
    else:
1150
        return {"status": "Unauthorized"}, 403
×
1151

1152

1153
@mod.route("/api/admin/add_user", methods=["POST"])
1✔
1154
@login_required
1✔
1155
def admin_add_user():
1✔
1156
    if current_user.is_white_team:
×
1157
        if "username" in request.form and "password" in request.form and "team_id" in request.form:
×
1158
            team_obj = db.session.query(Team).filter(Team.id == request.form["team_id"]).one()
×
1159
            user_obj = User(
×
1160
                username=html.escape(request.form["username"]),
1161
                password=html.escape(request.form["password"]),
1162
                team=team_obj,
1163
            )
1164
            db.session.add(user_obj)
×
1165
            db.session.commit()
×
1166
            flash("User successfully added.", "success")
×
1167
            return redirect(url_for("admin.manage"))
×
1168
        else:
1169
            flash("Error: Username, Password, or Team ID not specified.", "danger")
×
1170
            return redirect(url_for("admin.manage"))
×
1171
    else:
1172
        return {"status": "Unauthorized"}, 403
×
1173

1174

1175
@mod.route("/api/admin/add_team", methods=["POST"])
1✔
1176
@login_required
1✔
1177
def admin_add_team():
1✔
1178
    if current_user.is_white_team:
×
1179
        if "name" in request.form and "color" in request.form:
×
1180
            team_obj = Team(html.escape(request.form["name"]), html.escape(request.form["color"]))
×
1181
            db.session.add(team_obj)
×
1182
            db.session.commit()
×
1183
            flash("Team successfully added.", "success")
×
1184
            return redirect(url_for("admin.manage"))
×
1185
        else:
1186
            flash("Error: Team name or color not defined.", "danger")
×
1187
            return redirect(url_for("admin.manage"))
×
1188
    else:
1189
        return {"status": "Unauthorized"}, 403
×
1190

1191

1192
@mod.route("/api/admin/toggle_inject_scores_visible", methods=["POST"])
1✔
1193
@login_required
1✔
1194
def admin_toggle_inject_scores_visible():
1✔
1195
    if current_user.is_white_team:
1✔
1196
        Setting.clear_cache("inject_scores_visible")
1✔
1197
        setting = Setting.get_setting("inject_scores_visible")
1✔
1198
        setting.value = not setting.value
1✔
1199
        db.session.add(setting)
1✔
1200
        db.session.commit()
1✔
1201
        Setting.clear_cache("inject_scores_visible")
1✔
1202
        update_scoreboard_data()
1✔
1203
        update_all_inject_data()
1✔
1204
        return {"status": "Success"}
1✔
1205
    else:
1206
        return {"status": "Unauthorized"}, 403
1✔
1207

1208

1209
@mod.route("/api/admin/toggle_engine", methods=["POST"])
1✔
1210
@login_required
1✔
1211
def admin_toggle_engine():
1✔
1212
    if current_user.is_white_team:
1✔
1213
        Setting.clear_cache("engine_paused")
1✔
1214
        setting = Setting.get_setting("engine_paused")
1✔
1215
        setting.value = not setting.value
1✔
1216
        db.session.add(setting)
1✔
1217
        db.session.commit()
1✔
1218
        Setting.clear_cache("engine_paused")
1✔
1219
        return {"status": "Success"}
1✔
1220
    else:
1221
        return {"status": "Unauthorized"}, 403
1✔
1222

1223

1224
@mod.route("/api/admin/get_engine_stats")
1✔
1225
@login_required
1✔
1226
def admin_get_engine_stats():
1✔
1227
    if current_user.is_white_team:
1✔
1228
        engine_stats = {}
1✔
1229
        engine_stats["round_number"] = Round.get_last_round_num()
1✔
1230
        engine_stats["num_passed_checks"] = db.session.query(Check).filter_by(result=True).count()
1✔
1231
        engine_stats["num_failed_checks"] = db.session.query(Check).filter_by(result=False).count()
1✔
1232
        engine_stats["total_checks"] = db.session.query(Check).count()
1✔
1233
        return jsonify(engine_stats)
1✔
1234
    else:
1235
        return {"status": "Unauthorized"}, 403
×
1236

1237

1238
@mod.route("/api/admin/get_engine_paused")
1✔
1239
@login_required
1✔
1240
def admin_get_engine_status():
1✔
1241
    if current_user.is_white_team:
1✔
1242
        return jsonify({"paused": Setting.get_setting("engine_paused").value})
1✔
1243
    else:
1244
        return {"status": "Unauthorized"}, 403
1✔
1245

1246

1247
@mod.route("/api/admin/get_worker_stats")
1✔
1248
@login_required
1✔
1249
def admin_get_worker_stats():
1✔
1250
    if current_user.is_white_team:
×
1251
        worker_stats = CeleryStats.get_worker_stats()
×
1252
        return jsonify(data=worker_stats)
×
1253
    else:
1254
        return {"status": "Unauthorized"}, 403
×
1255

1256

1257
@mod.route("/api/admin/get_queue_stats")
1✔
1258
@login_required
1✔
1259
def admin_get_queue_stats():
1✔
1260
    if current_user.is_white_team:
×
1261
        queue_stats = CeleryStats.get_queue_stats()
×
1262
        return jsonify(data=queue_stats)
×
1263
    else:
1264
        return {"status": "Unauthorized"}, 403
×
1265

1266

1267
@mod.route("/api/admin/get_competition_summary")
1✔
1268
@login_required
1✔
1269
def admin_get_competition_summary():
1✔
1270
    if current_user.is_white_team:
×
1271
        blue_teams = db.session.query(Team).filter(Team.color == "Blue").all()
×
1272
        total_services = db.session.query(Service).join(Team).filter(Team.color == "Blue").count()
×
1273
        total_checks = db.session.query(Check).count()
×
1274
        passed_checks = db.session.query(Check).filter_by(result=True).count()
×
1275

1276
        overall_uptime = 0.0
×
1277
        if total_checks > 0:
×
1278
            overall_uptime = round((passed_checks / total_checks) * 100, 1)
×
1279

1280
        last_round = Round.get_last_round_num()
×
1281
        currently_passing = 0
×
1282
        if last_round > 0:
×
1283
            currently_passing = (
×
1284
                db.session.query(Check).join(Round).filter(Round.number == last_round, Check.result == True).count()
1285
            )
1286

1287
        return jsonify(
×
1288
            {
1289
                "blue_teams": len(blue_teams),
1290
                "total_services": total_services,
1291
                "currently_passing": currently_passing,
1292
                "overall_uptime": overall_uptime,
1293
            }
1294
        )
1295
    else:
1296
        return {"status": "Unauthorized"}, 403
×
1297

1298

1299
@mod.route("/api/admin/welcome/config", methods=["GET"])
1✔
1300
@login_required
1✔
1301
def admin_get_welcome_config():
1✔
1302
    """Get the welcome page configuration."""
1303
    if current_user.is_white_team:
×
1304
        config = get_welcome_config()
×
1305
        return jsonify(data=config)
×
1306
    else:
1307
        return {"status": "Unauthorized"}, 403
×
1308

1309

1310
@mod.route("/api/admin/welcome/config", methods=["PUT"])
1✔
1311
@login_required
1✔
1312
def admin_update_welcome_config():
1✔
1313
    """Update the welcome page configuration."""
1314
    if current_user.is_white_team:
×
1315
        data = request.get_json()
×
1316
        if not data:
×
1317
            return (
×
1318
                jsonify(
1319
                    {
1320
                        "status": "Error",
1321
                        "message": "No data provided",
1322
                    }
1323
                ),
1324
                400,
1325
            )
1326

1327
        try:
×
1328
            config = save_welcome_config(data)
×
1329
            return jsonify({"status": "Success", "data": config})
×
1330
        except ValueError as e:
×
1331
            return jsonify({"status": "Error", "message": str(e)}), 400
×
1332
    else:
1333
        return {"status": "Unauthorized"}, 403
×
1334

1335

1336
@mod.route("/api/admin/welcome/upload_logo", methods=["POST"])
1✔
1337
@login_required
1✔
1338
def admin_upload_sponsor_logo():
1✔
1339
    """Upload a sponsor logo image."""
1340
    import os
×
1341

1342
    from werkzeug.utils import secure_filename
×
1343

1344
    if not current_user.is_white_team:
×
1345
        return jsonify({"status": "Unauthorized"}), 403
×
1346

1347
    if "file" not in request.files:
×
1348
        return (
×
1349
            jsonify(
1350
                {
1351
                    "status": "Error",
1352
                    "message": "No file provided",
1353
                }
1354
            ),
1355
            400,
1356
        )
1357

1358
    file = request.files["file"]
×
1359
    if file.filename == "":
×
1360
        return (
×
1361
            jsonify(
1362
                {
1363
                    "status": "Error",
1364
                    "message": "No file selected",
1365
                }
1366
            ),
1367
            400,
1368
        )
1369

1370
    # Validate file extension
1371
    allowed = {"png", "jpg", "jpeg", "gif", "svg", "webp"}
×
1372
    ext = file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else ""
×
1373
    if ext not in allowed:
×
1374
        return (
×
1375
            jsonify(
1376
                {
1377
                    "status": "Error",
1378
                    "message": "File type not allowed. Use: " + ", ".join(sorted(allowed)),
1379
                }
1380
            ),
1381
            400,
1382
        )
1383

1384
    filename = secure_filename(file.filename)
×
1385

1386
    # Save to the shared upload folder (Docker volume), not the static dir.
1387
    # Static files are served by nginx from a separate container/mount,
1388
    # so uploads to the web container's static/ dir would be inaccessible.
1389
    sponsors_dir = os.path.join(config.upload_folder, "sponsors")
×
1390
    os.makedirs(sponsors_dir, exist_ok=True)
×
1391

1392
    # Avoid overwriting: add suffix if file exists
1393
    filepath = os.path.join(sponsors_dir, filename)
×
1394
    if os.path.exists(filepath):
×
1395
        name, dot_ext = os.path.splitext(filename)
×
1396
        counter = 1
×
1397
        while os.path.exists(filepath):
×
1398
            filename = f"{name}_{counter}{dot_ext}"
×
1399
            filepath = os.path.join(sponsors_dir, filename)
×
1400
            counter += 1
×
1401

1402
    file.save(filepath)
×
1403

1404
    url = f"/api/admin/welcome/sponsor_logo/{filename}"
×
1405
    return jsonify({"status": "Success", "url": url})
×
1406

1407

1408
@mod.route("/api/admin/welcome/sponsor_logo/<filename>")
1✔
1409
def serve_sponsor_logo(filename):
1✔
1410
    """Serve a sponsor logo from the upload folder."""
1411
    import os
×
1412

1413
    from flask import abort, send_from_directory
×
1414
    from werkzeug.utils import secure_filename
×
1415

1416
    safe_filename = secure_filename(filename)
×
1417
    if not safe_filename:
×
1418
        abort(404)
×
1419
    sponsors_dir = os.path.join(config.upload_folder, "sponsors")
×
1420
    return send_from_directory(sponsors_dir, safe_filename)
×
1421

1422

1423
# Global cache for loaded check classes (loaded once per app lifecycle)
1424
_check_classes = None
1✔
1425

1426

1427
def _get_check_classes():
1✔
1428
    """Load and cache all check classes."""
1429
    global _check_classes
1430
    if _check_classes is None:
×
1431
        _check_classes = {}
×
1432
        loaded_checks = Engine.load_check_files(config.checks_location)
×
1433
        for check_class in loaded_checks:
×
1434
            _check_classes[check_class.__name__] = check_class
×
1435
    return _check_classes
×
1436

1437

1438
@mod.route("/api/admin/check/dry_run", methods=["POST"])
1✔
1439
@login_required
1✔
1440
def admin_check_dry_run():
1✔
1441
    """
1442
    Run a service check without recording results to the database.
1443
    Dispatches to a Celery worker so the check runs in an environment
1444
    that has the required tools (SSH client, DNS utils, etc.).
1445
    """
1446
    if not current_user.is_white_team:
1✔
1447
        return jsonify({"status": "Unauthorized"}), 403
1✔
1448

1449
    data = request.get_json()
1✔
1450
    if not data or "service_id" not in data:
1✔
1451
        return jsonify({"status": "error", "message": "service_id is required"}), 400
1✔
1452

1453
    service_id = data.get("service_id")
1✔
1454
    environment_id = data.get("environment_id")
1✔
1455

1456
    service = db.session.get(Service, int(service_id))
1✔
1457
    if not service:
1✔
1458
        return jsonify({"status": "error", "message": f"Service with id {service_id} not found"}), 404
1✔
1459

1460
    # Get environment (specific or random)
1461
    if environment_id:
1✔
1462
        environment = db.session.get(Environment, int(environment_id))
1✔
1463
        if not environment or environment.service_id != service.id:
1✔
1464
            return jsonify({"status": "error", "message": f"Environment {environment_id} not found for service"}), 404
×
1465
    else:
1466
        if not service.environments:
1✔
1467
            return jsonify({"status": "error", "message": "Service has no environments configured"}), 400
×
1468
        import random
1✔
1469

1470
        environment = random.choice(service.environments)
1✔
1471

1472
    # Load check class and generate command
1473
    check_classes = _get_check_classes()
1✔
1474
    check_class = check_classes.get(service.check_name)
1✔
1475
    if not check_class:
1✔
1476
        return jsonify({"status": "error", "message": f"Check class '{service.check_name}' not found"}), 404
1✔
1477

1478
    try:
1✔
1479
        check_obj = check_class(environment)
1✔
1480
        command = check_obj.command()
1✔
1481
    except LookupError as e:
×
1482
        return jsonify({"status": "error", "message": f"Check configuration error: {str(e)}"}), 400
×
1483
    except Exception as e:
×
1484
        return jsonify({"status": "error", "message": f"Error instantiating check: {str(e)}"}), 500
×
1485

1486
    # Dispatch to a Celery worker via the service's worker queue
1487
    import time
1✔
1488

1489
    job = {"command": command}
1✔
1490
    start_time = time.time()
1✔
1491
    try:
1✔
1492
        result = execute_command.apply_async(args=[job], queue=service.worker_queue)
1✔
1493
        completed_job = result.get(timeout=35)
1✔
1494
    except Exception as e:
×
1495
        return jsonify({"status": "error", "message": f"Worker execution failed: {str(e)}"}), 500
×
1496
    execution_time_ms = int((time.time() - start_time) * 1000)
1✔
1497

1498
    # Evaluate result against matching_content
1499
    if completed_job.get("errored_out"):
1✔
1500
        check_result = False
1✔
1501
        reason = CHECK_TIMED_OUT_TEXT
1✔
1502
    else:
1503
        output = completed_job.get("output", "")
1✔
1504
        if re.search(environment.matching_content, output):
1✔
1505
            check_result = True
1✔
1506
            reason = CHECK_SUCCESS_TEXT
1✔
1507
        else:
1508
            check_result = False
1✔
1509
            reason = CHECK_FAILURE_TEXT
1✔
1510

1511
    return jsonify(
1✔
1512
        {
1513
            "status": "success",
1514
            "result": check_result,
1515
            "reason": reason,
1516
            "output": completed_job.get("output", "")[:35000],
1517
            "command": command,
1518
            "execution_time_ms": execution_time_ms,
1519
            "service_name": service.name,
1520
            "team_name": service.team.name,
1521
            "host": service.host,
1522
            "port": service.port,
1523
            "matching_content": environment.matching_content,
1524
            "environment_id": environment.id,
1525
        }
1526
    )
1527

1528

1529
@mod.route("/api/admin/rollback", methods=["POST"])
1✔
1530
@login_required
1✔
1531
def admin_rollback():
1✔
1532
    """
1533
    Rollback competition data to a specific round.
1534

1535
    Deletes all rounds, checks, and KB entries from the specified round onwards.
1536
    This is a destructive operation - use with caution.
1537

1538
    Request JSON:
1539
    {
1540
        "round_number": 10,  // Delete this round and all after it
1541
        "confirm": true      // Required to confirm destructive action
1542
    }
1543
    """
1544
    if not current_user.is_white_team:
1✔
1545
        return jsonify({"status": "Unauthorized"}), 403
1✔
1546

1547
    data = request.get_json()
1✔
1548
    if not data:
1✔
1549
        return jsonify({"status": "error", "message": "Request body required"}), 400
×
1550

1551
    round_number = data.get("round_number")
1✔
1552
    confirm = data.get("confirm", False)
1✔
1553

1554
    if round_number is None:
1✔
1555
        return jsonify({"status": "error", "message": "round_number is required"}), 400
1✔
1556

1557
    try:
1✔
1558
        round_number = int(round_number)
1✔
1559
    except (ValueError, TypeError):
1✔
1560
        return jsonify({"status": "error", "message": "round_number must be an integer"}), 400
1✔
1561

1562
    if round_number < 1:
1✔
1563
        return jsonify({"status": "error", "message": "round_number must be >= 1"}), 400
1✔
1564

1565
    if not confirm:
1✔
1566
        return jsonify({"status": "error", "message": "confirm=true required for destructive operation"}), 400
1✔
1567

1568
    # Get current round count for validation
1569
    current_round = Round.get_last_round_num()
1✔
1570
    if current_round == 0:
1✔
1571
        return jsonify({"status": "error", "message": "No rounds exist to rollback"}), 400
1✔
1572

1573
    if round_number > current_round:
1✔
1574
        return jsonify(
1✔
1575
            {"status": "error", "message": f"round_number ({round_number}) exceeds current round ({current_round})"}
1576
        ), 400
1577

1578
    # Pause the engine to prevent race conditions during rollback
1579
    was_paused = Setting.get_setting("engine_paused").value
1✔
1580
    if not was_paused:
1✔
1581
        setting = Setting.get_setting("engine_paused")
1✔
1582
        setting.value = True
1✔
1583
        db.session.add(setting)
1✔
1584
        db.session.commit()
1✔
1585
        Setting.clear_cache("engine_paused")
1✔
1586
        # Wait for the engine to finish its current round and notice the pause
1587
        time.sleep(5)
1✔
1588

1589
    try:
1✔
1590
        # Revoke any pending Celery tasks for rounds being rolled back
1591
        task_kb_entries = (
1✔
1592
            db.session.query(KB)
1593
            .filter(KB.round_num >= round_number, KB.name == "task_ids")
1594
            .all()
1595
        )
1596
        revoked_count = 0
1✔
1597
        for kb_entry in task_kb_entries:
1✔
1598
            try:
1✔
1599
                task_dict = json.loads(kb_entry.value)
1✔
1600
                for team_name, task_id_list in task_dict.items():
1✔
1601
                    for task_id in task_id_list:
×
1602
                        execute_command.AsyncResult(task_id).revoke(terminate=True)
×
1603
                        revoked_count += 1
×
1604
            except (json.JSONDecodeError, TypeError):
×
1605
                pass
×
1606

1607
        # Get rounds to delete
1608
        rounds_to_delete = db.session.query(Round).filter(Round.number >= round_number).all()
1✔
1609
        round_ids = [r.id for r in rounds_to_delete]
1✔
1610

1611
        # Count checks to delete
1612
        checks_count = db.session.query(Check).filter(Check.round_id.in_(round_ids)).count()
1✔
1613

1614
        # Count KB entries to delete
1615
        kb_count = db.session.query(KB).filter(KB.round_num >= round_number).count()
1✔
1616

1617
        # Delete checks in batches to avoid lock wait timeout
1618
        BATCH_SIZE = 500
1✔
1619
        for i in range(0, len(round_ids), BATCH_SIZE):
1✔
1620
            batch_ids = round_ids[i : i + BATCH_SIZE]
1✔
1621
            db.session.query(Check).filter(Check.round_id.in_(batch_ids)).delete(synchronize_session=False)
1✔
1622
            db.session.commit()
1✔
1623

1624
        # Delete KB entries
1625
        db.session.query(KB).filter(KB.round_num >= round_number).delete(synchronize_session=False)
1✔
1626
        db.session.commit()
1✔
1627

1628
        # Delete rounds
1629
        rounds_count = len(rounds_to_delete)
1✔
1630
        db.session.query(Round).filter(Round.number >= round_number).delete(synchronize_session=False)
1✔
1631
        db.session.commit()
1✔
1632

1633
    finally:
1634
        # Unpause engine if we paused it (even on error)
1635
        if not was_paused:
1✔
1636
            Setting.clear_cache("engine_paused")
1✔
1637
            setting = Setting.get_setting("engine_paused")
1✔
1638
            setting.value = False
1✔
1639
            db.session.add(setting)
1✔
1640
            db.session.commit()
1✔
1641
            Setting.clear_cache("engine_paused")
1✔
1642

1643
    # Clear all caches to force recalculation
1644
    from flask import current_app
1✔
1645

1646
    update_all_cache(current_app)
1✔
1647

1648
    return jsonify(
1✔
1649
        {
1650
            "status": "success",
1651
            "message": f"Rolled back to before round {round_number}",
1652
            "deleted": {
1653
                "rounds": rounds_count,
1654
                "checks": checks_count,
1655
                "kb_entries": kb_count,
1656
            },
1657
            "new_current_round": Round.get_last_round_num(),
1658
        }
1659
    )
1660

1661

1662
@mod.route("/api/admin/rollback/preview", methods=["POST"])
1✔
1663
@login_required
1✔
1664
def admin_rollback_preview():
1✔
1665
    """Preview what would be deleted by a rollback operation."""
1666
    if not current_user.is_white_team:
1✔
1667
        return jsonify({"status": "Unauthorized"}), 403
1✔
1668

1669
    data = request.get_json()
1✔
1670
    if not data:
1✔
1671
        return jsonify({"status": "error", "message": "Request body required"}), 400
1✔
1672

1673
    round_number = data.get("round_number")
1✔
1674
    if round_number is None:
1✔
1675
        return jsonify({"status": "error", "message": "round_number is required"}), 400
×
1676

1677
    try:
1✔
1678
        round_number = int(round_number)
1✔
1679
    except (ValueError, TypeError):
×
1680
        return jsonify({"status": "error", "message": "round_number must be an integer"}), 400
×
1681

1682
    if round_number < 1:
1✔
1683
        return jsonify({"status": "error", "message": "round_number must be >= 1"}), 400
×
1684

1685
    current_round = Round.get_last_round_num()
1✔
1686
    if current_round == 0:
1✔
1687
        return jsonify(
1✔
1688
            {
1689
                "status": "success",
1690
                "current_round": 0,
1691
                "round_number": round_number,
1692
                "will_delete": {"rounds": 0, "checks": 0, "kb_entries": 0},
1693
            }
1694
        )
1695

1696
    # Get rounds that would be deleted
1697
    rounds_to_delete = db.session.query(Round).filter(Round.number >= round_number).all()
1✔
1698
    round_ids = [r.id for r in rounds_to_delete]
1✔
1699

1700
    # Count checks
1701
    checks_count = db.session.query(Check).filter(Check.round_id.in_(round_ids)).count() if round_ids else 0
1✔
1702

1703
    # Count KB entries
1704
    kb_count = db.session.query(KB).filter(KB.round_num >= round_number).count()
1✔
1705

1706
    return jsonify(
1✔
1707
        {
1708
            "status": "success",
1709
            "current_round": current_round,
1710
            "round_number": round_number,
1711
            "will_delete": {
1712
                "rounds": len(rounds_to_delete),
1713
                "checks": checks_count,
1714
                "kb_entries": kb_count,
1715
            },
1716
        }
1717
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc