• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 18915548677

29 Oct 2025 04:44PM UTC coverage: 55.206% (-0.03%) from 55.233%
18915548677

Pull #499

github

web-flow
Merge cf9395424 into b4e05fd58
Pull Request #499: Change domain list to dict

34 of 71 new or added lines in 3 files covered. (47.89%)

3 existing lines in 1 file now uncovered.

1246 of 2257 relevant lines covered (55.21%)

1.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.27
/sdx_controller/messaging/rpc_queue_consumer.py
1
#!/usr/bin/env python
2
import json
2✔
3
import logging
2✔
4
import os
2✔
5
import threading
2✔
6
import time
2✔
7
import traceback
2✔
8
from queue import Queue
2✔
9

10
import pika
2✔
11
from sdx_datamodel.constants import (
2✔
12
    Constants,
13
    DomainStatus,
14
    MessageQueueNames,
15
    MongoCollections,
16
)
17
from sdx_datamodel.models.topology import SDX_TOPOLOGY_ID_prefix
2✔
18

19
from sdx_controller.handlers.lc_message_handler import LcMessageHandler
2✔
20
from sdx_controller.utils.parse_helper import ParseHelper
2✔
21

22
MQ_HOST = os.getenv("MQ_HOST")
2✔
23
MQ_PORT = os.getenv("MQ_PORT") or 5672
2✔
24
MQ_USER = os.getenv("MQ_USER") or "guest"
2✔
25
MQ_PASS = os.getenv("MQ_PASS") or "guest"
2✔
26
HEARTBEAT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", 10))  # seconds
2✔
27
HEARTBEAT_TOLERANCE = int(
2✔
28
    os.getenv("HEARTBEAT_TOLERANCE", 3)
29
)  # consecutive missed heartbeats allowed
30

31

32
# subscribe to the corresponding queue
33
SUB_QUEUE = MessageQueueNames.OXP_UPDATE
2✔
34

35
logger = logging.getLogger(__name__)
2✔
36

37

38
class HeartbeatMonitor:
2✔
39
    def __init__(self, db_instance):
2✔
40
        self.last_heartbeat = {}  # domain -> last heartbeat timestamp
2✔
41
        self.domain_status = {}  # domain -> current status (UP / UNKNOWN)
2✔
42
        self.lock = threading.Lock()
2✔
43
        self.monitoring = False
2✔
44
        self.db_instance = db_instance  # store DB instance
2✔
45

46
    def record_heartbeat(self, domain):
2✔
47
        """Record heartbeat from a domain and mark it as UP if previously UNKNOWN."""
NEW
48
        with self.lock:
×
NEW
49
            self.last_heartbeat[domain] = time.time()
×
50

NEW
51
            previous_status = self.domain_status.get(domain)
×
NEW
52
            self.domain_status[domain] = DomainStatus.UP
×
53

54
            # Update DB if status changed from UNKNOWN -> UP
NEW
55
            if previous_status == DomainStatus.UNKNOWN:
×
NEW
56
                logger.info(
×
57
                    f"[HeartbeatMonitor] Domain {domain} is BACK UP after missed heartbeats."
58
                )
NEW
59
                domain_dict_from_db = self.db_instance.get_value_from_db(
×
60
                    MongoCollections.DOMAINS, Constants.DOMAIN_DICT
61
                )
NEW
62
                if domain in domain_dict_from_db:
×
NEW
63
                    domain_dict_from_db[domain] = DomainStatus.UP
×
NEW
64
                    self.db_instance.add_key_value_pair_to_db(
×
65
                        MongoCollections.DOMAINS,
66
                        Constants.DOMAIN_DICT,
67
                        domain_dict_from_db,
68
                    )
69

NEW
70
            logger.debug(f"[HeartbeatMonitor] Heartbeat recorded for {domain}")
×
71

72
    def check_status(self):
2✔
73
        """Mark domains as UNKNOWN if heartbeats are missing."""
74
        now = time.time()
2✔
75
        with self.lock:
2✔
76
            for domain, last_time in self.last_heartbeat.items():
2✔
NEW
77
                if now - last_time > HEARTBEAT_TOLERANCE * HEARTBEAT_INTERVAL:
×
NEW
78
                    if self.domain_status.get(domain) != DomainStatus.UNKNOWN:
×
NEW
79
                        logger.warning(
×
80
                            f"[HeartbeatMonitor] Domain {domain} marked UNKNOWN (missed {HEARTBEAT_TOLERANCE} heartbeats)"
81
                        )
NEW
82
                        self.domain_status[domain] = DomainStatus.UNKNOWN
×
83

NEW
84
                        domain_dict_from_db = self.db_instance.get_value_from_db(
×
85
                            MongoCollections.DOMAINS, Constants.DOMAIN_DICT
86
                        )
NEW
87
                        if domain in domain_dict_from_db:
×
NEW
88
                            domain_dict_from_db[domain] = DomainStatus.UNKNOWN
×
NEW
89
                            self.db_instance.add_key_value_pair_to_db(
×
90
                                MongoCollections.DOMAINS,
91
                                Constants.DOMAIN_DICT,
92
                                domain_dict_from_db,
93
                            )
94

95
    def get_status(self, domain):
2✔
96
        """Return the current status of a domain."""
NEW
97
        with self.lock:
×
NEW
98
            return self.domain_status.get(domain, "unknown")
×
99

100
    def start_monitoring(self):
2✔
101
        """Start a background thread to monitor heartbeat status."""
102
        if self.monitoring:
2✔
NEW
103
            return
×
104
        self.monitoring = True
2✔
105
        logger.info("[HeartbeatMonitor] Started monitoring heartbeats.")
2✔
106

107
        def monitor_loop():
2✔
108
            while self.monitoring:
2✔
109
                self.check_status()
2✔
110
                time.sleep(HEARTBEAT_INTERVAL)
2✔
111

112
        t = threading.Thread(target=monitor_loop, daemon=True)
2✔
113
        t.start()
2✔
114

115

116
class RpcConsumer(object):
2✔
117
    def __init__(self, thread_queue, exchange_name, te_manager):
2✔
118
        self.logger = logging.getLogger(__name__)
2✔
119

120
        self.logger.info(f"[MQ] Using amqp://{MQ_USER}@{MQ_HOST}:{MQ_PORT}")
2✔
121

122
        self.connection = pika.BlockingConnection(
2✔
123
            pika.ConnectionParameters(
124
                host=MQ_HOST,
125
                port=MQ_PORT,
126
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
127
            )
128
        )
129

130
        self.channel = self.connection.channel()
2✔
131
        self.exchange_name = exchange_name
2✔
132

133
        self.channel.queue_declare(queue=SUB_QUEUE)
2✔
134
        self._thread_queue = thread_queue
2✔
135

136
        self.te_manager = te_manager
2✔
137

138
        self._exit_event = threading.Event()
2✔
139

140
    def on_request(self, ch, method, props, message_body):
2✔
141
        response = message_body
×
142
        self._thread_queue.put(message_body)
×
143

144
        self.connection = pika.BlockingConnection(
×
145
            pika.ConnectionParameters(
146
                host=MQ_HOST,
147
                port=MQ_PORT,
148
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
149
            )
150
        )
151
        self.channel = self.connection.channel()
×
152

153
        try:
×
154
            ch.basic_publish(
×
155
                exchange=self.exchange_name,
156
                routing_key=props.reply_to,
157
                properties=pika.BasicProperties(correlation_id=props.correlation_id),
158
                body=str(response),
159
            )
160
            ch.basic_ack(delivery_tag=method.delivery_tag)
×
161
        except Exception as err:
×
162
            self.logger.info(f"[MQ] encountered error when publishing: {err}")
×
163

164
    def start_consumer(self):
2✔
165
        self.channel.basic_qos(prefetch_count=1)
2✔
166
        self.channel.basic_consume(queue=SUB_QUEUE, on_message_callback=self.on_request)
2✔
167

168
        self.logger.info(" [MQ] Awaiting requests from queue: " + SUB_QUEUE)
2✔
169
        self.channel.start_consuming()
2✔
170

171
    def start_sdx_consumer(self, thread_queue, db_instance):
2✔
172
        rpc = RpcConsumer(thread_queue, "", self.te_manager)
2✔
173
        t1 = threading.Thread(target=rpc.start_consumer, args=(), daemon=True)
2✔
174
        t1.start()
2✔
175

176
        lc_message_handler = LcMessageHandler(db_instance, self.te_manager)
2✔
177
        parse_helper = ParseHelper()
2✔
178

179
        heartbeat_monitor = HeartbeatMonitor(db_instance)
2✔
180
        heartbeat_monitor.start_monitoring()
2✔
181

182
        latest_topo = {}
2✔
183
        domain_dict = {}
2✔
184

185
        # This part reads from DB when SDX controller initially starts.
186
        # It looks for domain_dict, if already in DB,
187
        # Then use the existing ones from DB.
188
        domain_dict_from_db = db_instance.get_value_from_db(
2✔
189
            MongoCollections.DOMAINS, Constants.DOMAIN_DICT
190
        )
191
        latest_topo_from_db = db_instance.get_value_from_db(
2✔
192
            MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY
193
        )
194

195
        if domain_dict_from_db:
2✔
NEW
196
            domain_dict = domain_dict_from_db
×
197
            logger.debug("Domain list already exists in db: ")
×
NEW
198
            logger.debug(domain_dict)
×
199

200
        if latest_topo_from_db:
2✔
201
            latest_topo = latest_topo_from_db
2✔
202
            logger.debug("Topology already exists in db: ")
2✔
203
            logger.debug(latest_topo)
2✔
204

205
        # If topologies already saved in db, use them to initialize te_manager
206
        if domain_dict:
2✔
NEW
207
            for domain in domain_dict.keys():
×
UNCOV
208
                topology = db_instance.get_value_from_db(
×
209
                    MongoCollections.TOPOLOGIES, SDX_TOPOLOGY_ID_prefix + domain
210
                )
211

212
                if not topology:
×
213
                    continue
×
214

215
                # Get the actual thing minus the Mongo ObjectID.
216
                self.te_manager.add_topology(topology)
×
217
                logger.debug(f"Read {domain}: {topology}")
×
218

219
        while not self._exit_event.is_set():
2✔
220
            msg = thread_queue.get()
2✔
UNCOV
221
            logger.debug("MQ received message:" + str(msg))
×
222

223
            if not parse_helper.is_json(msg):
×
NEW
224
                logger.debug("Non JSON message, ignored")
×
UNCOV
225
                continue
×
226

NEW
227
            msg_json = json.loads(msg)
×
NEW
228
            if "type" in msg_json and msg_json.get("type") == "Heart Beat":
×
NEW
229
                domain = msg_json.get("domain")
×
NEW
230
                heartbeat_monitor.record_heartbeat(domain)
×
NEW
231
                logger.debug(f"Heart beat received from {domain}")
×
NEW
232
                continue
×
233

234
            try:
×
235
                lc_message_handler.process_lc_json_msg(
×
236
                    msg,
237
                    latest_topo,
238
                    domain_dict,
239
                )
240
            except Exception as exc:
×
241
                err = traceback.format_exc().replace("\n", ", ")
×
242
                logger.error(f"Failed to process LC message: {exc} -- {err}")
×
243

244
    def stop_threads(self):
2✔
245
        """
246
        Signal threads that we're ready to stop.
247
        """
248
        logger.info("[MQ] Stopping threads.")
×
249
        self.channel.stop_consuming()
×
250
        self._exit_event.set()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc