• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scoringengine / scoringengine / 23385248069

21 Mar 2026 05:50PM UTC coverage: 73.202% (-2.5%) from 75.69%
23385248069

push

github

RustyBower
Fix test to match DB fallback behavior for missing output files

The endpoint now returns check.output from DB (200) instead of 404
when the on-disk file doesn't exist.

3726 of 5090 relevant lines covered (73.2%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.68
/scoring_engine/engine/engine.py
1
import importlib
1✔
2
import importlib.util
1✔
3
import inspect
1✔
4
import json
1✔
5
import os
1✔
6
import random
1✔
7
import re
1✔
8
import signal
1✔
9
import sys
1✔
10
import time
1✔
11
from datetime import datetime
1✔
12
from functools import partial
1✔
13
from pathlib import Path
1✔
14

15
from flask import current_app
1✔
16
from sqlalchemy.orm import selectinload
1✔
17

18
from scoring_engine.cache_helper import update_all_cache
1✔
19
from scoring_engine.config import config
1✔
20
from scoring_engine.db import db
1✔
21
from scoring_engine.engine.basic_check import CHECK_FAILURE_TEXT, CHECK_SUCCESS_TEXT, CHECK_TIMED_OUT_TEXT
1✔
22
from scoring_engine.engine.execute_command import execute_command
1✔
23
from scoring_engine.engine.job import Job
1✔
24
from scoring_engine.logger import logger
1✔
25
from scoring_engine.models.check import Check
1✔
26
from scoring_engine.models.environment import Environment
1✔
27
from scoring_engine.models.kb import KB
1✔
28
from scoring_engine.models.round import Round
1✔
29
from scoring_engine.models.property import Property
1✔
30
from scoring_engine.models.service import Service
1✔
31
from scoring_engine.models.setting import Setting
1✔
32

33

34
def engine_sigint_handler(signum, frame, engine):
1✔
35
    engine.shutdown()
×
36

37

38
class Engine(object):
1✔
39
    def __init__(self, total_rounds=0):
1✔
40
        self.checks = []
1✔
41
        self.total_rounds = total_rounds
1✔
42

43
        self.config = config
1✔
44
        self.checks_location = self.config.checks_location
1✔
45

46
        # Keep reference to db for backward compatibility
47
        self.db = db
1✔
48

49
        self.verify_settings()
1✔
50

51
        self.last_round = False
1✔
52
        self.rounds_run = 0
1✔
53

54
        signal.signal(signal.SIGINT, partial(engine_sigint_handler, engine=self))
1✔
55
        signal.signal(signal.SIGTERM, partial(engine_sigint_handler, engine=self))
1✔
56

57
        self.current_round = Round.get_last_round_num()
1✔
58

59
        self.load_checks()
1✔
60
        self.round_running = False
1✔
61

62
    def verify_settings(self):
1✔
63
        settings = ["target_round_time", "worker_refresh_time", "engine_paused", "pause_duration"]
1✔
64
        for setting_name in settings:
1✔
65
            if not Setting.get_setting(setting_name):
1✔
66
                logger.error("Must have " + setting_name + " setting.")
×
67
                exit(1)
×
68

69
    def shutdown(self):
1✔
70
        if self.round_running:
1✔
71
            logger.warning("Shutting down after this round...")
×
72
        else:
73
            logger.warning("Shutting down now.")
1✔
74
        self.last_round = True
1✔
75

76
    def add_check(self, check_obj):
1✔
77
        self.checks.append(check_obj)
1✔
78
        self.checks = sorted(self.checks, key=lambda check: check.__name__)
1✔
79
        self._check_map = {check.__name__: check for check in self.checks}
1✔
80

81
    def load_checks(self):
1✔
82
        logger.debug("Loading checks source from " + str(self.checks_location))
1✔
83
        loaded_checks = Engine.load_check_files(self.checks_location)
1✔
84
        for loaded_check in loaded_checks:
1✔
85
            logger.debug(" Found " + loaded_check.__name__)
1✔
86
            self.add_check(loaded_check)
1✔
87

88
    # @staticmethod
89
    # def load_check_files(checks_location):
90
    #     checks_location_module_str = checks_location.replace("/", ".")
91
    #     found_check_modules = pynsive.list_modules(checks_location_module_str)
92
    #     found_checks = []
93
    #     for found_module in found_check_modules:
94
    #         module_obj = pynsive.import_module(found_module)
95
    #         for name, arg in inspect.getmembers(module_obj):
96
    #             if name == "BasicCheck" or name == "HTTPPostCheck":
97
    #                 continue
98
    #             elif not name.endswith("Check"):
99
    #                 continue
100
    #             found_checks.append(arg)
101
    #     return found_checks
102

103
    @staticmethod
1✔
104
    def load_check_files(checks_location):
1✔
105
        found_checks = []
1✔
106
        checks_path = Path(checks_location)
1✔
107

108
        if not checks_path.is_dir():
1✔
109
            raise ValueError(f"{checks_location} is not a valid directory.")
×
110

111
        # Iterate through the checks directory to find Python files
112
        for py_file in checks_path.glob("*.py"):
1✔
113
            module_name = py_file.stem  # Get the filename without the `.py` extension
1✔
114
            module_path = str(py_file.resolve())
1✔
115

116
            # Convert file path to module format (dot-separated)
117
            checks_location_module_str = str(checks_path.resolve()).replace("/", ".")
1✔
118
            relative_module_path = os.path.relpath(module_path, str(checks_path.parent))
1✔
119
            module_str = relative_module_path.replace("/", ".").replace(".py", "")
1✔
120

121
            # Import the module dynamically
122
            spec = importlib.util.spec_from_file_location(module_str, module_path)
1✔
123
            module = importlib.util.module_from_spec(spec)
1✔
124
            spec.loader.exec_module(module)
1✔
125

126
            # Inspect the module to find classes ending with 'Check'
127
            for name, arg in inspect.getmembers(module, inspect.isclass):
1✔
128
                if name == "BasicCheck" or name == "HTTPPostCheck":
1✔
129
                    continue
1✔
130
                if not name.endswith("Check"):
1✔
131
                    continue
×
132
                found_checks.append(arg)
1✔
133

134
        return found_checks
1✔
135

136
    def check_name_to_obj(self, check_name):
1✔
137
        if not hasattr(self, "_check_map"):
1✔
138
            self._check_map = {check.__name__: check for check in self.checks}
×
139
        return self._check_map.get(check_name)
1✔
140

141
    @staticmethod
1✔
142
    def _safe_regex_search(pattern, text, env_id=None, timeout=5):
1✔
143
        """Run re.search with a timeout to prevent ReDoS hangs.
144

145
        Uses signal.alarm on the main thread. Falls back to literal
146
        match if the regex takes longer than *timeout* seconds or is invalid.
147
        """
148
        def _alarm_handler(signum, frame):
1✔
149
            raise TimeoutError("Regex timed out")
×
150

151
        old_handler = signal.signal(signal.SIGALRM, _alarm_handler)
1✔
152
        signal.alarm(timeout)
1✔
153
        try:
1✔
154
            result = re.search(pattern, text)
1✔
155
        except TimeoutError:
1✔
156
            logger.warning(
×
157
                "Regex timed out after %ds for environment %s, pattern %r — falling back to literal match",
158
                timeout,
159
                env_id,
160
                pattern,
161
            )
162
            result = pattern in text
×
163
        except re.error:
1✔
164
            logger.warning(
1✔
165
                "Invalid regex pattern for environment %s: %r, falling back to literal match",
166
                env_id,
167
                pattern,
168
            )
169
            result = pattern in text
1✔
170
        finally:
171
            signal.alarm(0)
1✔
172
            signal.signal(signal.SIGALRM, old_handler)
1✔
173
        return result
1✔
174

175
    def sleep(self, seconds):
1✔
176
        try:
×
177
            time.sleep(seconds)
×
178
        except Exception:
×
179
            self.shutdown()
×
180

181
    def is_last_round(self):
1✔
182
        return self.last_round or (self.rounds_run == self.total_rounds and self.total_rounds != 0)
1✔
183

184
    def all_pending_tasks(self, tasks, completed=None):
1✔
185
        """Return list of task IDs still in PENDING state.
186

187
        Args:
188
            tasks: dict of team_name -> [task_id, ...]
189
            completed: optional set of already-completed task IDs to skip
190
        """
191
        if completed is None:
1✔
192
            completed = set()
×
193
        pending_tasks = []
1✔
194
        for team_name, task_ids in tasks.items():
1✔
195
            for task_id in task_ids:
1✔
196
                if task_id in completed:
1✔
197
                    continue
×
198
                task = execute_command.AsyncResult(task_id)
1✔
199
                if task.state == "PENDING":
1✔
200
                    pending_tasks.append(task_id)
×
201
                else:
202
                    completed.add(task_id)
1✔
203
        return pending_tasks
1✔
204

205
    def run(self):
1✔
206
        if self.total_rounds == 0:
1✔
207
            logger.info("Running engine for unlimited rounds")
×
208
        else:
209
            logger.info("Running engine for {0} round(s)".format(self.total_rounds))
1✔
210

211
        while not self.is_last_round():
1✔
212
            # End any stale transaction so MySQL REPEATABLE READ gets a
213
            # fresh snapshot.  Without this, the pause loop would hold an
214
            # open transaction and never see the updated engine_paused value.
215
            self.db.session.rollback()
1✔
216

217
            if Setting.get_setting("engine_paused").value:
1✔
218
                pause_duration = int(Setting.get_setting("pause_duration").value)
×
219
                logger.info("Engine Paused. Sleeping for {0} seconds".format(pause_duration))
×
220
                self.sleep(pause_duration)
×
221
                continue
×
222

223
            # Re-sync round counter from DB (handles rollback while paused or between rounds)
224
            db_round = Round.get_last_round_num()
1✔
225
            if db_round < self.current_round:
1✔
226
                logger.warning(
×
227
                    "Round rollback detected: engine was at round %d, DB says %d. Re-syncing.",
228
                    self.current_round,
229
                    db_round,
230
                )
231
                self.current_round = db_round
×
232

233
            self.current_round += 1
1✔
234
            logger.info("Running round: " + str(self.current_round))
1✔
235
            round_start_time = datetime.now()
1✔
236
            self.round_running = True
1✔
237
            self.rounds_run += 1
1✔
238

239
            # Eager-load environments, properties, and accounts to avoid N+1 queries.
240
            # Service.team is already lazy="joined" so it comes for free.
241
            services = self.db.session.query(Service).options(
1✔
242
                selectinload(Service.environments).selectinload(Environment.properties),
243
                selectinload(Service.accounts),
244
            ).all()[:]
245
            logger.info("Loaded %d services from database", len(services))
1✔
246
            random.shuffle(services)
1✔
247
            jitter_max = self.config.task_jitter_max_delay
1✔
248
            task_ids = {}
1✔
249
            task_env_map = {}  # task_id -> environment_id for timeout fallback
1✔
250
            for service in services:
1✔
251
                check_class = self.check_name_to_obj(service.check_name)
1✔
252
                if check_class is None:
1✔
253
                    raise LookupError("Unable to map service to check code for " + str(service.check_name))
×
254
                logger.debug("Adding " + service.team.name + " - " + service.name + " check to queue")
1✔
255
                dispatch_start = time.time()
1✔
256
                environment = random.choice(service.environments)
1✔
257
                check_obj = check_class(environment)
1✔
258
                command_str = check_obj.command()
1✔
259
                job = Job(environment_id=environment.id, command=command_str)
1✔
260
                countdown = random.uniform(0, jitter_max) if jitter_max > 0 else 0
1✔
261
                task = execute_command.apply_async(args=[job], queue=service.worker_queue, countdown=countdown)
1✔
262
                dispatch_elapsed = time.time() - dispatch_start
1✔
263
                if dispatch_elapsed > 1.0:
1✔
264
                    logger.warning(
×
265
                        "Slow task dispatch: %s - %s took %.1fs (check=%s)",
266
                        service.team.name, service.name, dispatch_elapsed, service.check_name,
267
                    )
268
                team_name = environment.service.team.name
1✔
269
                if team_name not in task_ids:
1✔
270
                    task_ids[team_name] = []
1✔
271
                task_ids[team_name].append(task.id)
1✔
272
                task_env_map[task.id] = environment.id
1✔
273

274
            total_tasks = sum(len(ids) for ids in task_ids.values())
1✔
275
            logger.info("Dispatched %d tasks to %d team queues", total_tasks, len(task_ids))
1✔
276

277
            # This array keeps track of all current round objects
278
            # incase we need to backout any changes to prevent
279
            # inconsistent check results
280
            cleanup_items = []
1✔
281

282
            try:
1✔
283
                # We store the list of tasks in the db, so that the web app
284
                # can consume them and can dynamically update a progress bar
285
                task_ids_str = json.dumps(task_ids)
1✔
286
                latest_kb = KB(name="task_ids", value=task_ids_str, round_num=self.current_round)
1✔
287
                cleanup_items.append(latest_kb)
1✔
288
                self.db.session.add(latest_kb)
1✔
289
                self.db.session.commit()
1✔
290
                logger.info("Saved task manifest to KB, waiting for workers")
1✔
291

292
                completed_tasks = set()
1✔
293
                pending_tasks = self.all_pending_tasks(task_ids, completed_tasks)
1✔
294
                round_wait_start = time.time()
1✔
295
                # Pre-fetch settings used in the wait loop
296
                target_round_time = int(Setting.get_setting("target_round_time").value)
1✔
297
                worker_refresh_time = int(Setting.get_setting("worker_refresh_time").value)
1✔
298
                # Hard ceiling: 3x the target round time or 5 minutes, whichever is greater
299
                max_round_wait = max(target_round_time * 3, 300)
1✔
300
                while pending_tasks:
1✔
301
                    elapsed = time.time() - round_wait_start
×
302
                    if elapsed >= max_round_wait:
×
303
                        logger.warning(
×
304
                            "Round timeout reached (%.0fs). Revoking %d stuck task(s) and proceeding.",
305
                            elapsed,
306
                            len(pending_tasks),
307
                        )
308
                        for stuck_task_id in pending_tasks:
×
309
                            execute_command.AsyncResult(stuck_task_id).revoke(terminate=True)
×
310
                        break
×
311
                    waiting_info = "Waiting for all jobs to finish (sleeping " + str(worker_refresh_time) + " seconds)"
×
312
                    waiting_info += " " + str(len(pending_tasks)) + " left in queue."
×
313
                    logger.info(waiting_info)
×
314
                    self.sleep(worker_refresh_time)
×
315
                    pending_tasks = self.all_pending_tasks(task_ids, completed_tasks)
×
316
                else:
317
                    logger.info("All jobs have finished for this round")
1✔
318

319
                logger.info("Determining check results and saving to db")
1✔
320
                round_obj = Round(round_start=round_start_time, number=self.current_round)
1✔
321
                cleanup_items.append(round_obj)
1✔
322
                self.db.session.add(round_obj)
1✔
323
                self.db.session.commit()
1✔
324

325
                # Pre-fetch all environments needed for result processing in one query
326
                all_env_ids = list(set(task_env_map.values()))
1✔
327
                env_query = (
1✔
328
                    self.db.session.query(Environment)
329
                    .options(selectinload(Environment.service))
330
                    .filter(Environment.id.in_(all_env_ids))
331
                    .all()
332
                )
333
                env_cache = {e.id: e for e in env_query}
1✔
334

335
                logger.info("Pre-fetched %d environments, processing task results", len(env_cache))
1✔
336

337
                # We keep track of the number of passed and failed checks per round
338
                # so we can report a little bit at the end of each round
339
                teams = {}
1✔
340
                processed_count = 0
1✔
341
                for team_name, task_ids in task_ids.items():
1✔
342
                    for task_id in task_ids:
1✔
343
                        task_start = time.time()
1✔
344
                        task = execute_command.AsyncResult(task_id)
1✔
345
                        # Fetch meta once to avoid double deserialization of large results
346
                        task_state = task.state
1✔
347
                        task_result = task.result if task_state == "SUCCESS" else None
1✔
348
                        task_elapsed = time.time() - task_start
1✔
349
                        processed_count += 1
1✔
350
                        if processed_count % 100 == 0:
1✔
351
                            logger.info("Processing results: %d/%d tasks", processed_count, total_tasks)
×
352
                        if task_elapsed > 1.0:
1✔
353
                            output_len = len(task_result.get("output", "")) if isinstance(task_result, dict) else 0
×
354
                            logger.warning(
×
355
                                "Slow task result fetch: task %s took %.1fs (state=%s, output_len=%d)",
356
                                task_id, task_elapsed, task_state, output_len,
357
                            )
358

359
                        # Handle stuck/revoked/failed tasks via env mapping
360
                        if task_result is None or not isinstance(task_result, dict):
1✔
361
                            env_id = task_env_map.get(task_id)
×
362
                            if env_id is None:
×
363
                                logger.warning("No result or env mapping for task %s (state=%s), skipping", task_id, task_state)
×
364
                                continue
×
365
                            environment = env_cache.get(env_id)
×
366
                            if environment is None:
×
367
                                logger.warning("Environment %s not found for timed-out task %s, skipping", env_id, task_id)
×
368
                                continue
×
369
                            logger.warning(
×
370
                                "Task %s stuck/failed (state=%s), marking %s - %s as timed out",
371
                                task_id, task_state, environment.service.team.name, environment.service.name,
372
                            )
373
                            result = False
×
374
                            reason = CHECK_TIMED_OUT_TEXT
×
375
                            full_output = "Task did not complete within the round time limit."
×
376
                        else:
377
                            environment = env_cache.get(task_result["environment_id"])
1✔
378
                            if environment is None:
1✔
379
                                logger.warning("Environment %s not found for task %s, skipping", task_result["environment_id"], task_id)
×
380
                                continue
×
381
                            full_output = task_result["output"][:5000]
1✔
382
                            if task_result["errored_out"]:
1✔
383
                                result = False
×
384
                                reason = CHECK_TIMED_OUT_TEXT
×
385
                            else:
386
                                matched = self._safe_regex_search(
1✔
387
                                    environment.matching_content, full_output, environment.id
388
                                )
389
                                if matched:
1✔
390
                                    # Check reject pattern - if it matches, fail even though content matched
391
                                    if environment.matching_content_reject:
1✔
392
                                        rejected = self._safe_regex_search(
×
393
                                            environment.matching_content_reject, full_output, environment.id
394
                                        )
395
                                        if rejected:
×
396
                                            result = False
×
397
                                            reason = CHECK_FAILURE_TEXT
×
398
                                        else:
399
                                            result = True
×
400
                                            reason = CHECK_SUCCESS_TEXT
×
401
                                    else:
402
                                        result = True
1✔
403
                                        reason = CHECK_SUCCESS_TEXT
1✔
404
                                else:
405
                                    result = False
×
406
                                    reason = CHECK_FAILURE_TEXT
×
407

408
                        if environment.service.team.name not in teams:
1✔
409
                            teams[environment.service.team.name] = {
1✔
410
                                "Success": [],
411
                                "Failed": [],
412
                            }
413
                        if result:
1✔
414
                            teams[environment.service.team.name]["Success"].append(environment.service.name)
1✔
415
                        else:
416
                            teams[environment.service.team.name]["Failed"].append(environment.service.name)
×
417

418
                        check = Check(service=environment.service, round=round_obj)
1✔
419

420
                        # TODO: File writes disabled for performance investigation.
421
                        # Re-enable once Redis output cap proves sufficient.
422
                        # # Write full output to disk for later retrieval
423
                        # try:
424
                        #     team_name_safe = environment.service.team.name
425
                        #     service_name_safe = environment.service.name
426
                        #     output_dir = os.path.join(
427
                        #         self.config.check_output_folder,
428
                        #         team_name_safe,
429
                        #         service_name_safe,
430
                        #     )
431
                        #     os.makedirs(output_dir, exist_ok=True)
432
                        #     output_path = os.path.join(output_dir, f"round_{self.current_round}.txt")
433
                        #     with open(output_path, "w") as f:
434
                        #         f.write(full_output)
435
                        # except Exception as write_err:
436
                        #     logger.warning("Failed to write check output to disk: %s", write_err)
437

438
                        # Store 5K in DB (matches Redis MAX_OUTPUT cap)
439
                        command = task_result["command"] if task_result else ""
1✔
440
                        check.finished(
1✔
441
                            result=result,
442
                            reason=reason,
443
                            output=full_output[:5000],
444
                            command=command,
445
                        )
446
                        cleanup_items.append(check)
1✔
447
                        self.db.session.add(check)
1✔
448
                logger.info("Processed %d check results, committing to database", total_tasks)
1✔
449
                round_end_time = datetime.now()
1✔
450
                round_obj.round_end = round_end_time
1✔
451
                self.db.session.commit()
1✔
452
                logger.info("Database commit complete")
1✔
453

454
            except Exception as e:
×
455
                # We got an error while writing to db (could be normal docker stop command)
456
                # but we gotta clean up any trace of the current round so when we startup
457
                # again, we're at a consistent state
458
                logger.error("Error received while writing check results to db")
×
459
                logger.exception(e)
×
460
                logger.error("Ending round and cleaning up the db")
×
461
                for cleanup_item in cleanup_items:
×
462
                    try:
×
463
                        self.db.session.delete(cleanup_item)
×
464
                        self.db.session.commit()
×
465
                    except Exception:
×
466
                        pass
×
467
                sys.exit(1)
×
468

469
            logger.info("Finished Round " + str(self.current_round))
1✔
470
            logger.info("Round Duration " + str((round_end_time - round_start_time).seconds) + " seconds")
1✔
471
            logger.info("Round Stats:")
1✔
472
            for team_name in sorted(teams):
1✔
473
                stat_string = " " + team_name
1✔
474
                stat_string += " Success: " + str(len(teams[team_name]["Success"]))
1✔
475
                stat_string += ", Failed: " + str(len(teams[team_name]["Failed"]))
1✔
476
                if len(teams[team_name]["Failed"]) > 0:
1✔
477
                    stat_string += " (" + ", ".join(teams[team_name]["Failed"]) + ")"
×
478
                logger.info(stat_string)
1✔
479

480
            logger.info("Updating Caches")
1✔
481
            update_all_cache(current_app)
1✔
482

483
            # Clear session identity map to prevent bloat across rounds.
484
            # Without this, the session accumulates hundreds of objects per round
485
            # and the next Service query stalls reconciling stale state.
486
            self.db.session.expire_all()
1✔
487

488
            self.round_running = False
1✔
489

490
            if not self.is_last_round():
1✔
491
                target_round_time = int(Setting.get_setting("target_round_time").value)
1✔
492
                round_duration = (datetime.now() - round_start_time).seconds
1✔
493
                round_delta = target_round_time - round_duration
1✔
494
                if round_delta > 0:
1✔
495
                    logger.info(
×
496
                        f"Targetting {target_round_time} seconds per round. Sleeping " + str(round_delta) + " seconds"
497
                    )
498
                    self.sleep(round_delta)
×
499
                else:
500
                    logger.warning(
1✔
501
                        f"Service checks lasted {abs(round_delta)}s longer than round length ({target_round_time}s). Starting next round immediately"
502
                    )
503

504
        logger.info("Engine finished running")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc