• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scoringengine / scoringengine / 23385248069

21 Mar 2026 05:50PM UTC coverage: 73.202% (-2.5%) from 75.69%
23385248069

push

github

RustyBower
Fix test to match DB fallback behavior for missing output files

The endpoint now returns check.output from DB (200) instead of 404
when the on-disk file doesn't exist.

3726 of 5090 relevant lines covered (73.2%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.65
/scoring_engine/celery_stats.py
1
from sqlalchemy.orm import joinedload
1✔
2

3
from scoring_engine.celery_app import celery_app
1✔
4
from scoring_engine.db import db
1✔
5
from scoring_engine.models.service import Service
1✔
6
from scoring_engine.models.team import Team
1✔
7

8

9
class CeleryStats:
1✔
10
    @staticmethod
1✔
11
    def get_queue_stats():
1✔
12
        finished_queue_facts = []
1✔
13

14
        queues_facts = {}
1✔
15

16
        all_services = db.session.query(Service).options(joinedload(Service.team)).all()
1✔
17
        for service in all_services:
1✔
18
            if service.worker_queue not in queues_facts:
1✔
19
                queues_facts[service.worker_queue] = {
1✔
20
                    "name": service.worker_queue,
21
                    "workers": [],
22
                    "services_running": {},
23
                }
24
            if service.team.name not in queues_facts[service.worker_queue]["services_running"]:
1✔
25
                queues_facts[service.worker_queue]["services_running"][service.team.name] = []
1✔
26
            queues_facts[service.worker_queue]["services_running"][service.team.name].append(service.name)
1✔
27

28
        for queue_name, queue_facts in queues_facts.items():
1✔
29
            # If all of the services are listed for this specific worker, let's just alias it as 'All'
30
            queue_services_total_running = 0
1✔
31
            for team_name, team_services in queues_facts[service.worker_queue]["services_running"].items():
1✔
32
                queue_services_total_running += len(team_services)
1✔
33
            if queue_services_total_running == len(all_services):
1✔
34
                queues_facts[service.worker_queue]["services_running"] = "All"
1✔
35
            else:
36
                blue_teams = Team.get_all_blue_teams()
×
37
                for blue_team in blue_teams:
×
38
                    if blue_team.name in queue_facts["services_running"] and len(blue_team.services) == len(
×
39
                        queue_facts["services_running"][blue_team.name]
40
                    ):
41
                        # Summarize it for each team if the worker runs all services
42
                        queues_facts[service.worker_queue]["services_running"][blue_team.name] = "ALL"
×
43

44
        for queue_name, queue_facts in queues_facts.items():
1✔
45
            # Get which workers are assigned to which queues
46
            active_queues = celery_app.control.inspect().active_queues()
1✔
47
            # If we don't have any queues, we also have no workers
48
            if active_queues is not None:
1✔
49
                for worker_name, queues in active_queues.items():
1✔
50
                    if queue_name in [k["name"] for k in queues]:
1✔
51
                        queue_facts["workers"].append(worker_name)
1✔
52

53
        for queue_name, queue_facts in queues_facts.items():
1✔
54
            finished_queue_facts.append(queue_facts)
1✔
55
        return finished_queue_facts
1✔
56

57
    @staticmethod
1✔
58
    def get_worker_stats():
1✔
59
        finished_worker_facts = []
1✔
60
        worker_facts = {}
1✔
61

62
        # Get which workers are assigned to which queues
63
        active_queues = celery_app.control.inspect().active_queues()
1✔
64
        # If we don't have any queues, we also have no workers
65
        if active_queues is None:
1✔
66
            return finished_worker_facts
1✔
67

68
        for worker_name, queues in active_queues.items():
1✔
69
            queue_names = []
1✔
70
            for queue in queues:
1✔
71
                queue_names.append(queue["name"])
1✔
72
            worker_facts[worker_name] = {
1✔
73
                "worker_queues": queue_names,
74
            }
75

76
        # Get worker stats about completed tasks and such
77
        active_stats = celery_app.control.inspect().stats()
1✔
78
        for worker_name, stats in active_stats.items():
1✔
79
            completed_tasks = 0
1✔
80
            if "execute_command" in stats["total"]:
1✔
81
                completed_tasks = stats["total"]["execute_command"]
1✔
82
            worker_facts[worker_name]["completed_tasks"] = completed_tasks
1✔
83
            worker_facts[worker_name]["num_threads"] = stats["pool"]["max-concurrency"]
1✔
84

85
        # Get worker stats about currently running tasks
86
        active_tasks_stats = celery_app.control.inspect().active()
1✔
87
        for worker_name, stats in active_tasks_stats.items():
1✔
88
            worker_facts[worker_name]["running_tasks"] = len(stats)
1✔
89

90
        # Produce list of Service checks this worker will run
91
        all_services = db.session.query(Service).options(joinedload(Service.team)).all()
1✔
92
        for worker_name, facts in worker_facts.items():
1✔
93
            facts["services_running"] = []
1✔
94
            services_running = {}
1✔
95
            for service in all_services:
1✔
96
                if service.worker_queue in facts["worker_queues"]:
1✔
97
                    if service.team.name not in services_running:
1✔
98
                        services_running[service.team.name] = []
1✔
99
                    services_running[service.team.name].append(service.name)
1✔
100
            # If all of the services are listed for this specific worker, let's just alias it as 'All'
101
            worker_services_total_running = 0
1✔
102
            for team_name, team_services in services_running.items():
1✔
103
                worker_services_total_running += len(team_services)
1✔
104
            if worker_services_total_running == len(all_services):
1✔
105
                facts["services_running"] = "All"
1✔
106
            else:
107
                facts["services_running"] = services_running
×
108
                blue_teams = Team.get_all_blue_teams()
×
109
                for blue_team in blue_teams:
×
110
                    if blue_team.name in services_running and len(blue_team.services) == len(
×
111
                        services_running[blue_team.name]
112
                    ):
113
                        # Summarize it for each team if the worker runs all services
114
                        facts["services_running"][blue_team.name] = "ALL"
×
115

116
            # Instead of an empty string in the table, let's tell them None
117
            if len(facts["services_running"]) == 0:
1✔
118
                facts["services_running"] = "None"
×
119
            # Clean up services_running
120

121
            finished_worker_facts.append(
1✔
122
                {
123
                    "worker_name": worker_name,
124
                    "services_running": facts["services_running"],
125
                    "num_threads": facts["num_threads"],
126
                    "completed_tasks": facts["completed_tasks"],
127
                    "running_tasks": facts["running_tasks"],
128
                    "worker_queues": facts["worker_queues"],
129
                }
130
            )
131
        return finished_worker_facts
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc