• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 19274641269

11 Nov 2025 06:13PM UTC coverage: 54.689% (-0.5%) from 55.206%
19274641269

Pull #498

github

web-flow
Merge bd566bc8d into 122f59c7f
Pull Request #498: when restart, sync the toplogy vlan/bw state from connections out of db

2 of 26 new or added lines in 2 files covered. (7.69%)

99 existing lines in 2 files now uncovered.

1248 of 2282 relevant lines covered (54.69%)

1.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.61
/sdx_controller/messaging/rpc_queue_consumer.py
1
#!/usr/bin/env python
2
import json
2✔
3
import logging
2✔
4
import os
2✔
5
import threading
2✔
6
import time
2✔
7
import traceback
2✔
8
from queue import Queue
2✔
9

10
import pika
2✔
11
from sdx_datamodel.constants import (
2✔
12
    Constants,
13
    DomainStatus,
14
    MessageQueueNames,
15
    MongoCollections,
16
)
17
from sdx_datamodel.models.topology import SDX_TOPOLOGY_ID_prefix
2✔
18
from sdx_pce.models import ConnectionSolution
2✔
19

20
from sdx_controller.handlers.connection_handler import (
2✔
21
    ConnectionHandler,
22
    connection_state_machine,
23
    get_connection_status,
24
    parse_conn_status,
25
)
26
from sdx_controller.handlers.lc_message_handler import LcMessageHandler
2✔
27
from sdx_controller.utils.parse_helper import ParseHelper
2✔
28

29
MQ_HOST = os.getenv("MQ_HOST")
2✔
30
MQ_PORT = os.getenv("MQ_PORT") or 5672
2✔
31
MQ_USER = os.getenv("MQ_USER") or "guest"
2✔
32
MQ_PASS = os.getenv("MQ_PASS") or "guest"
2✔
33
HEARTBEAT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", 10))  # seconds
2✔
34
HEARTBEAT_TOLERANCE = int(
2✔
35
    os.getenv("HEARTBEAT_TOLERANCE", 3)
36
)  # consecutive missed heartbeats allowed
37

38

39
# subscribe to the corresponding queue
40
SUB_QUEUE = MessageQueueNames.OXP_UPDATE
2✔
41

42
logger = logging.getLogger(__name__)
2✔
43

44

45
class HeartbeatMonitor:
2✔
46
    def __init__(self, db_instance):
2✔
47
        self.last_heartbeat = {}  # domain -> last heartbeat timestamp
2✔
48
        self.domain_status = {}  # domain -> current status (UP / UNKNOWN)
2✔
49
        self.lock = threading.Lock()
2✔
50
        self.monitoring = False
2✔
51
        self.db_instance = db_instance  # store DB instance
2✔
52

53
    def record_heartbeat(self, domain):
2✔
54
        """Record heartbeat from a domain and mark it as UP if previously UNKNOWN."""
UNCOV
55
        with self.lock:
×
UNCOV
56
            self.last_heartbeat[domain] = time.time()
×
57

UNCOV
58
            previous_status = self.domain_status.get(domain)
×
59
            self.domain_status[domain] = DomainStatus.UP
×
60

61
            # Update DB if status changed from UNKNOWN -> UP
62
            if previous_status == DomainStatus.UNKNOWN:
×
63
                logger.info(
×
64
                    f"[HeartbeatMonitor] Domain {domain} is BACK UP after missed heartbeats."
65
                )
66
                domain_dict_from_db = self.db_instance.get_value_from_db(
×
67
                    MongoCollections.DOMAINS, Constants.DOMAIN_DICT
68
                )
UNCOV
69
                if domain in domain_dict_from_db:
×
70
                    domain_dict_from_db[domain] = DomainStatus.UP
×
UNCOV
71
                    self.db_instance.add_key_value_pair_to_db(
×
72
                        MongoCollections.DOMAINS,
73
                        Constants.DOMAIN_DICT,
74
                        domain_dict_from_db,
75
                    )
76

UNCOV
77
            logger.debug(f"[HeartbeatMonitor] Heartbeat recorded for {domain}")
×
78

79
    def check_status(self):
2✔
80
        """Mark domains as UNKNOWN if heartbeats are missing."""
81
        now = time.time()
2✔
82
        with self.lock:
2✔
83
            for domain, last_time in self.last_heartbeat.items():
2✔
UNCOV
84
                if now - last_time > HEARTBEAT_TOLERANCE * HEARTBEAT_INTERVAL:
×
UNCOV
85
                    if self.domain_status.get(domain) != DomainStatus.UNKNOWN:
×
UNCOV
86
                        logger.warning(
×
87
                            f"[HeartbeatMonitor] Domain {domain} marked UNKNOWN (missed {HEARTBEAT_TOLERANCE} heartbeats)"
88
                        )
89
                        self.domain_status[domain] = DomainStatus.UNKNOWN
×
90

UNCOV
91
                        domain_dict_from_db = self.db_instance.get_value_from_db(
×
92
                            MongoCollections.DOMAINS, Constants.DOMAIN_DICT
93
                        )
UNCOV
94
                        if domain in domain_dict_from_db:
×
95
                            domain_dict_from_db[domain] = DomainStatus.UNKNOWN
×
UNCOV
96
                            self.db_instance.add_key_value_pair_to_db(
×
97
                                MongoCollections.DOMAINS,
98
                                Constants.DOMAIN_DICT,
99
                                domain_dict_from_db,
100
                            )
101

102
    def get_status(self, domain):
2✔
103
        """Return the current status of a domain."""
UNCOV
104
        with self.lock:
×
UNCOV
105
            return self.domain_status.get(domain, "unknown")
×
106

107
    def start_monitoring(self):
2✔
108
        """Start a background thread to monitor heartbeat status."""
109
        if self.monitoring:
2✔
UNCOV
110
            return
×
111
        self.monitoring = True
2✔
112
        logger.info("[HeartbeatMonitor] Started monitoring heartbeats.")
2✔
113

114
        def monitor_loop():
2✔
115
            while self.monitoring:
2✔
116
                self.check_status()
2✔
117
                time.sleep(HEARTBEAT_INTERVAL)
2✔
118

119
        t = threading.Thread(target=monitor_loop, daemon=True)
2✔
120
        t.start()
2✔
121

122

123
class RpcConsumer(object):
2✔
124
    def __init__(self, thread_queue, exchange_name, te_manager):
2✔
125
        self.logger = logging.getLogger(__name__)
2✔
126

127
        self.logger.info(f"[MQ] Using amqp://{MQ_USER}@{MQ_HOST}:{MQ_PORT}")
2✔
128

129
        self.connection = pika.BlockingConnection(
2✔
130
            pika.ConnectionParameters(
131
                host=MQ_HOST,
132
                port=MQ_PORT,
133
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
134
            )
135
        )
136

137
        self.channel = self.connection.channel()
2✔
138
        self.exchange_name = exchange_name
2✔
139

140
        self.channel.queue_declare(queue=SUB_QUEUE)
2✔
141
        self._thread_queue = thread_queue
2✔
142

143
        self.te_manager = te_manager
2✔
144

145
        self._exit_event = threading.Event()
2✔
146

147
    def on_request(self, ch, method, props, message_body):
2✔
UNCOV
148
        response = message_body
×
UNCOV
149
        self._thread_queue.put(message_body)
×
150

UNCOV
151
        self.connection = pika.BlockingConnection(
×
152
            pika.ConnectionParameters(
153
                host=MQ_HOST,
154
                port=MQ_PORT,
155
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
156
            )
157
        )
UNCOV
158
        self.channel = self.connection.channel()
×
159

UNCOV
160
        try:
×
UNCOV
161
            ch.basic_publish(
×
162
                exchange=self.exchange_name,
163
                routing_key=props.reply_to,
164
                properties=pika.BasicProperties(correlation_id=props.correlation_id),
165
                body=str(response),
166
            )
UNCOV
167
            ch.basic_ack(delivery_tag=method.delivery_tag)
×
UNCOV
168
        except Exception as err:
×
UNCOV
169
            self.logger.info(f"[MQ] encountered error when publishing: {err}")
×
170

171
    def start_consumer(self):
2✔
172
        self.channel.basic_qos(prefetch_count=1)
2✔
173
        self.channel.basic_consume(queue=SUB_QUEUE, on_message_callback=self.on_request)
2✔
174

175
        self.logger.info(" [MQ] Awaiting requests from queue: " + SUB_QUEUE)
2✔
176
        self.channel.start_consuming()
2✔
177

178
    def start_sdx_consumer(self, thread_queue, db_instance):
2✔
179
        rpc = RpcConsumer(thread_queue, "", self.te_manager)
2✔
180
        t1 = threading.Thread(target=rpc.start_consumer, args=(), daemon=True)
2✔
181
        t1.start()
2✔
182

183
        lc_message_handler = LcMessageHandler(db_instance, self.te_manager)
2✔
184
        parse_helper = ParseHelper()
2✔
185

186
        heartbeat_monitor = HeartbeatMonitor(db_instance)
2✔
187
        heartbeat_monitor.start_monitoring()
2✔
188

189
        latest_topo = {}
2✔
190
        domain_dict = {}
2✔
191

192
        # This part reads from DB when SDX controller initially starts.
193
        # It looks for domain_dict, if already in DB,
194
        # Then use the existing ones from DB.
195
        domain_dict_from_db = db_instance.get_value_from_db(
2✔
196
            MongoCollections.DOMAINS, Constants.DOMAIN_DICT
197
        )
198
        latest_topo_from_db = db_instance.get_value_from_db(
2✔
199
            MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY
200
        )
201

202
        if domain_dict_from_db:
2✔
UNCOV
203
            domain_dict = domain_dict_from_db
×
UNCOV
204
            logger.debug("Domain list already exists in db: ")
×
UNCOV
205
            logger.debug(domain_dict)
×
206

207
        if latest_topo_from_db:
2✔
208
            latest_topo = latest_topo_from_db
2✔
209
            logger.debug("Topology already exists in db: ")
2✔
210
            logger.debug(latest_topo)
2✔
211

212
        # If topologies already saved in db, use them to initialize te_manager
213
        if domain_dict:
2✔
UNCOV
214
            for domain in domain_dict.keys():
×
215
                topology = db_instance.get_value_from_db(
×
216
                    MongoCollections.TOPOLOGIES, SDX_TOPOLOGY_ID_prefix + domain
217
                )
218

219
                if not topology:
×
UNCOV
220
                    continue
×
221

222
                # Get the actual thing minus the Mongo ObjectID.
223
                self.te_manager.add_topology(topology)
×
224
                logger.debug(f"Read {domain}: {topology}")
×
225
            # update topology/pce state in TE Manager
NEW
UNCOV
226
            connections = db_instance.get_all_entries_in_collection(
×
227
                MongoCollections.CONNECTIONS
228
            )
NEW
229
            if not connections:
×
NEW
UNCOV
230
                logger.info("No connection was found")
×
231
            else:
NEW
232
                for connection in connections:
×
NEW
233
                    service_id = next(iter(connection))
×
NEW
234
                    status = get_connection_status(db_instance, service_id)
×
NEW
235
                    logger.info(
×
236
                        f"Restart: service_id: {service_id}, status: {status.get(service_id)}"
237
                    )
NEW
238
                    request_dict = connection.get(service_id)
×
NEW
239
                    solution_links = db_instance.read_from_db(
×
240
                        MongoCollections.SOLUTIONS, service_id
241
                    )
NEW
242
                    if not solution_links:
×
NEW
243
                        logger.warning(
×
244
                            f"Could not find solution links for {service_id}"
245
                        )
NEW
246
                    solution = ConnectionSolution(
×
247
                        connection_map={}, cost=0, request_id=service_id
248
                    )
NEW
249
                    solution.connection_map[request_dict] = solution_links
×
NEW
250
                    breakdown = db_instance.read_from_db(
×
251
                        MongoCollections.BREAKDOWNS, service_id
252
                    )
NEW
253
                    if not breakdown:
×
NEW
254
                        logger.warning(f"Could not find breakdown for {service_id}")
×
NEW
255
                        continue
×
NEW
256
                    try:
×
NEW
257
                        breakdown = self.te_manager.generate_connection_breakdown(
×
258
                            solution, request_dict
259
                        )
260
                        self._logger.info(
261
                            f"generate_connection_breakdown(): tagged_breakdown: {breakdown}"
262
                        )
263

264
                        # Make tests pass, temporarily.
265
                        # need to throw an exception if tagged_breakdown is None
NEW
266
                    except Exception as e:
×
NEW
267
                        err = traceback.format_exc().replace("\n", ", ")
×
NEW
268
                        logger.error(
×
269
                            f"Error when recovering breakdown vlan assignment: {e} - {err}"
270
                        )
NEW
271
                        return f"Error: {e}", 410
×
272

273
        while not self._exit_event.is_set():
2✔
274
            msg = thread_queue.get()
2✔
275
            logger.debug("MQ received message:" + str(msg))
×
276

277
            if not parse_helper.is_json(msg):
×
278
                logger.debug("Non JSON message, ignored")
×
279
                continue
×
280

281
            msg_json = json.loads(msg)
×
282
            if "type" in msg_json and msg_json.get("type") == "Heart Beat":
×
283
                domain = msg_json.get("domain")
×
284
                heartbeat_monitor.record_heartbeat(domain)
×
285
                logger.debug(f"Heart beat received from {domain}")
×
286
                continue
×
287

288
            try:
×
289
                lc_message_handler.process_lc_json_msg(
×
290
                    msg,
291
                    latest_topo,
292
                    domain_dict,
293
                )
294
            except Exception as exc:
×
295
                err = traceback.format_exc().replace("\n", ", ")
×
296
                logger.error(f"Failed to process LC message: {exc} -- {err}")
×
297

298
    def stop_threads(self):
2✔
299
        """
300
        Signal threads that we're ready to stop.
301
        """
302
        logger.info("[MQ] Stopping threads.")
×
303
        self.channel.stop_consuming()
×
304
        self._exit_event.set()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc