• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 11257371528

09 Oct 2024 02:42PM UTC coverage: 56.211% (-2.2%) from 58.396%
11257371528

push

github

web-flow
Merge pull request #329 from atlanticwave-sdx/use-lc-x-for-topo-db-key

Use LC-X for topo db key

0 of 7 new or added lines in 2 files covered. (0.0%)

1 existing line in 1 file now uncovered.

1068 of 1900 relevant lines covered (56.21%)

2.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.3
/sdx_controller/messaging/rpc_queue_consumer.py
1
#!/usr/bin/env python
2
import json
4✔
3
import logging
4✔
4
import os
4✔
5
import threading
4✔
6
from queue import Queue
4✔
7

8
import pika
4✔
9

10
from sdx_controller.handlers.lc_message_handler import LcMessageHandler
4✔
11
from sdx_controller.utils.parse_helper import ParseHelper
4✔
12

13
MQ_HOST = os.getenv("MQ_HOST")
4✔
14
MQ_PORT = os.getenv("MQ_PORT") or 5672
4✔
15
MQ_USER = os.getenv("MQ_USER") or "guest"
4✔
16
MQ_PASS = os.getenv("MQ_PASS") or "guest"
4✔
17

18
# subscribe to the corresponding queue
19
SUB_QUEUE = os.getenv("SUB_QUEUE")
4✔
20

21
logger = logging.getLogger(__name__)
4✔
22

23

24
class RpcConsumer(object):
4✔
25
    def __init__(self, thread_queue, exchange_name, te_manager):
4✔
26
        self.logger = logging.getLogger(__name__)
4✔
27

28
        self.logger.info(f"[MQ] Using amqp://{MQ_USER}@{MQ_HOST}:{MQ_PORT}")
4✔
29

30
        self.connection = pika.BlockingConnection(
4✔
31
            pika.ConnectionParameters(
32
                host=MQ_HOST,
33
                port=MQ_PORT,
34
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
35
            )
36
        )
37

38
        self.channel = self.connection.channel()
4✔
39
        self.exchange_name = exchange_name
4✔
40

41
        self.channel.queue_declare(queue=SUB_QUEUE)
4✔
42
        self._thread_queue = thread_queue
4✔
43

44
        self.te_manager = te_manager
4✔
45

46
        self._exit_event = threading.Event()
4✔
47

48
    def on_request(self, ch, method, props, message_body):
4✔
49
        response = message_body
×
50
        self._thread_queue.put(message_body)
×
51

52
        self.connection = pika.BlockingConnection(
×
53
            pika.ConnectionParameters(
54
                host=MQ_HOST,
55
                port=MQ_PORT,
56
                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
57
            )
58
        )
59
        self.channel = self.connection.channel()
×
60

61
        ch.basic_publish(
×
62
            exchange=self.exchange_name,
63
            routing_key=props.reply_to,
64
            properties=pika.BasicProperties(correlation_id=props.correlation_id),
65
            body=str(response),
66
        )
67
        ch.basic_ack(delivery_tag=method.delivery_tag)
×
68

69
    def start_consumer(self):
4✔
70
        self.channel.basic_qos(prefetch_count=1)
4✔
71
        self.channel.basic_consume(queue=SUB_QUEUE, on_message_callback=self.on_request)
4✔
72

73
        self.logger.info(" [MQ] Awaiting requests from queue: " + SUB_QUEUE)
4✔
74
        self.channel.start_consuming()
4✔
75

76
    def start_sdx_consumer(self, thread_queue, db_instance):
4✔
77
        MESSAGE_ID = 0
4✔
78
        HEARTBEAT_ID = 0
4✔
79

80
        rpc = RpcConsumer(thread_queue, "", self.te_manager)
4✔
81
        t1 = threading.Thread(target=rpc.start_consumer, args=(), daemon=True)
4✔
82
        t1.start()
4✔
83

84
        lc_message_handler = LcMessageHandler(db_instance, self.te_manager)
4✔
85
        parse_helper = ParseHelper()
4✔
86

87
        latest_topo = {}
4✔
88
        domain_list = []
4✔
89
        num_domain_topos = 0
4✔
90

91
        # This part reads from DB when SDX controller initially starts.
92
        # It looks for domain_list, and num_domain_topos, if they are already in DB,
93
        # Then use the existing ones from DB.
94
        domain_list_from_db = db_instance.read_from_db("domains", "domain_list")
4✔
95
        latest_topo_from_db = db_instance.read_from_db("topologies", "latest_topo")
4✔
96
        num_domain_topos_from_db = db_instance.read_from_db(
4✔
97
            "topologies", "num_domain_topos"
98
        )
99

100
        if domain_list_from_db:
4✔
101
            domain_list = domain_list_from_db["domain_list"]
×
102
            logger.debug("Read domain_list from db: ")
×
103
            logger.debug(domain_list)
×
104

105
        if latest_topo_from_db:
4✔
106
            latest_topo = latest_topo_from_db["latest_topo"]
×
107
            logger.debug("Read latest_topo from db: ")
×
108
            logger.debug(latest_topo)
×
109

110
        if num_domain_topos_from_db:
4✔
111
            num_domain_topos = num_domain_topos_from_db["num_domain_topos"]
×
112
            logger.debug("Read num_domain_topos from db: ")
×
113
            logger.debug(num_domain_topos)
×
NEW
114
            for topo in range(1, num_domain_topos + 1):
×
NEW
115
                db_key = f"LC-{topo}"
×
NEW
116
                topology = db_instance.read_from_db("topologies", db_key)
×
117

118
                if topology:
×
119
                    # Get the actual thing minus the Mongo ObjectID.
NEW
120
                    topology = topology[db_key]
×
121
                    topo_json = json.loads(topology)
×
122
                    self.te_manager.add_topology(topo_json)
×
NEW
123
                    logger.debug(f"Read {db_key}: {topology}")
×
124

125
        while not self._exit_event.is_set():
4✔
126
            # Queue.get() will block until there's an item in the queue.
127
            msg = thread_queue.get()
4✔
128
            logger.debug("MQ received message:" + str(msg))
×
129

130
            if "Heart Beat" in str(msg):
×
131
                HEARTBEAT_ID += 1
×
132
                logger.debug("Heart beat received. ID: " + str(HEARTBEAT_ID))
×
133
            else:
134
                logger.info("Saving to database.")
×
135
                if parse_helper.is_json(msg):
×
136
                    if "version" in str(msg):
×
137
                        lc_message_handler.process_lc_json_msg(
×
138
                            msg,
139
                            latest_topo,
140
                            domain_list,
141
                            num_domain_topos,
142
                        )
143
                    else:
144
                        logger.info("got message from MQ: " + str(msg))
×
145

146
    def stop_threads(self):
4✔
147
        """
148
        Signal threads that we're ready to stop.
149
        """
150
        logger.info("[MQ] Stopping threads.")
×
151
        self.channel.stop_consuming()
×
152
        self._exit_event.set()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc