• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 18220499892

03 Oct 2025 11:04AM UTC coverage: 55.233% (-0.08%) from 55.313%
18220499892

Pull #494

github

web-flow
Merge d80390b13 into eaf9bb32a
Pull Request #494: Add generic exception handler to avoid SDX-Controller queue consumer crash on failures

1 of 6 new or added lines in 1 file covered. (16.67%)

1219 of 2207 relevant lines covered (55.23%)

1.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.67
/sdx_controller/messaging/rpc_queue_consumer.py
1
#!/usr/bin/env python
2
import json
2✔
3
import logging
2✔
4
import os
2✔
5
import threading
2✔
6
import traceback
2✔
7
from queue import Queue
2✔
8

9
import pika
2✔
10
from sdx_datamodel.constants import Constants, MessageQueueNames, MongoCollections
2✔
11
from sdx_datamodel.models.topology import SDX_TOPOLOGY_ID_prefix
2✔
12

13
from sdx_controller.handlers.lc_message_handler import LcMessageHandler
2✔
14
from sdx_controller.utils.parse_helper import ParseHelper
2✔
15

16
MQ_HOST = os.getenv("MQ_HOST")
2✔
17
MQ_PORT = os.getenv("MQ_PORT") or 5672
2✔
18
MQ_USER = os.getenv("MQ_USER") or "guest"
2✔
19
MQ_PASS = os.getenv("MQ_PASS") or "guest"
2✔
20

21
# subscribe to the corresponding queue
22
SUB_QUEUE = MessageQueueNames.OXP_UPDATE
2✔
23

24
logger = logging.getLogger(__name__)
2✔
25

26

27
class RpcConsumer(object):
2✔
28
    def __init__(self, thread_queue, exchange_name, te_manager):
2✔
29
        self.logger = logging.getLogger(__name__)
2✔
30

31
        self.logger.info(f"[MQ] Using amqp://{MQ_USER}@{MQ_HOST}:{MQ_PORT}")
2✔
32

33
        self.connection = pika.BlockingConnection(
2✔
34
            pika.ConnectionParameters(
35
                host=MQ_HOST,
36
                port=MQ_PORT,
37
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
38
            )
39
        )
40

41
        self.channel = self.connection.channel()
2✔
42
        self.exchange_name = exchange_name
2✔
43

44
        self.channel.queue_declare(queue=SUB_QUEUE)
2✔
45
        self._thread_queue = thread_queue
2✔
46

47
        self.te_manager = te_manager
2✔
48

49
        self._exit_event = threading.Event()
2✔
50

51
    def on_request(self, ch, method, props, message_body):
2✔
52
        response = message_body
×
53
        self._thread_queue.put(message_body)
×
54

55
        self.connection = pika.BlockingConnection(
×
56
            pika.ConnectionParameters(
57
                host=MQ_HOST,
58
                port=MQ_PORT,
59
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
60
            )
61
        )
62
        self.channel = self.connection.channel()
×
63

64
        try:
×
65
            ch.basic_publish(
×
66
                exchange=self.exchange_name,
67
                routing_key=props.reply_to,
68
                properties=pika.BasicProperties(correlation_id=props.correlation_id),
69
                body=str(response),
70
            )
71
            ch.basic_ack(delivery_tag=method.delivery_tag)
×
72
        except Exception as err:
×
73
            self.logger.info(f"[MQ] encountered error when publishing: {err}")
×
74

75
    def start_consumer(self):
2✔
76
        self.channel.basic_qos(prefetch_count=1)
2✔
77
        self.channel.basic_consume(queue=SUB_QUEUE, on_message_callback=self.on_request)
2✔
78

79
        self.logger.info(" [MQ] Awaiting requests from queue: " + SUB_QUEUE)
2✔
80
        self.channel.start_consuming()
2✔
81

82
    def start_sdx_consumer(self, thread_queue, db_instance):
2✔
83
        HEARTBEAT_ID = 0
2✔
84

85
        rpc = RpcConsumer(thread_queue, "", self.te_manager)
2✔
86
        t1 = threading.Thread(target=rpc.start_consumer, args=(), daemon=True)
2✔
87
        t1.start()
2✔
88

89
        lc_message_handler = LcMessageHandler(db_instance, self.te_manager)
2✔
90
        parse_helper = ParseHelper()
2✔
91

92
        latest_topo = {}
2✔
93
        domain_list = []
2✔
94

95
        # This part reads from DB when SDX controller initially starts.
96
        # It looks for domain_list, if already in DB,
97
        # Then use the existing ones from DB.
98
        domain_list_from_db = db_instance.get_value_from_db(
2✔
99
            MongoCollections.DOMAINS, Constants.DOMAIN_LIST
100
        )
101
        latest_topo_from_db = db_instance.get_value_from_db(
2✔
102
            MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY
103
        )
104

105
        if domain_list_from_db:
2✔
106
            domain_list = domain_list_from_db
×
107
            logger.debug("Domain list already exists in db: ")
×
108
            logger.debug(domain_list)
×
109

110
        if latest_topo_from_db:
2✔
111
            latest_topo = latest_topo_from_db
2✔
112
            logger.debug("Topology already exists in db: ")
2✔
113
            logger.debug(latest_topo)
2✔
114

115
        # If topologies already saved in db, use them to initialize te_manager
116
        if domain_list:
2✔
117
            for domain in domain_list:
×
118
                topology = db_instance.get_value_from_db(
×
119
                    MongoCollections.TOPOLOGIES, SDX_TOPOLOGY_ID_prefix + domain
120
                )
121

122
                if not topology:
×
123
                    continue
×
124

125
                # Get the actual thing minus the Mongo ObjectID.
126
                self.te_manager.add_topology(topology)
×
127
                logger.debug(f"Read {domain}: {topology}")
×
128

129
        while not self._exit_event.is_set():
2✔
130
            # Queue.get() will block until there's an item in the queue.
131
            msg = thread_queue.get()
2✔
132
            logger.debug("MQ received message:" + str(msg))
×
133

134
            if "Heart Beat" in str(msg):
×
135
                HEARTBEAT_ID += 1
×
136
                logger.debug("Heart beat received. ID: " + str(HEARTBEAT_ID))
×
137
                continue
×
138

139
            if not parse_helper.is_json(msg):
×
140
                continue
×
141

142
            if "version" not in str(msg):
×
143
                logger.info("Got message (NO VERSION) from MQ: " + str(msg))
×
144

NEW
145
            try:
×
NEW
146
                lc_message_handler.process_lc_json_msg(
×
147
                    msg,
148
                    latest_topo,
149
                    domain_list,
150
                )
NEW
151
            except Exception as exc:
×
NEW
152
                err = traceback.format_exc().replace("\n", ", ")
×
NEW
153
                logger.error(f"Failed to process LC message: {exc} -- {err}")
×
154

155
    def stop_threads(self):
2✔
156
        """
157
        Signal threads that we're ready to stop.
158
        """
159
        logger.info("[MQ] Stopping threads.")
×
160
        self.channel.stop_consuming()
×
161
        self._exit_event.set()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc