• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 18915548677

29 Oct 2025 04:44PM UTC coverage: 55.206% (-0.03%) from 55.233%
18915548677

Pull #499

github

web-flow
Merge cf9395424 into b4e05fd58
Pull Request #499: Change domain list to dict

34 of 71 new or added lines in 3 files covered. (47.89%)

3 existing lines in 1 file now uncovered.

1246 of 2257 relevant lines covered (55.21%)

1.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.67
/sdx_controller/handlers/lc_message_handler.py
1
import json
2✔
2
import logging
2✔
3

4
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
5
from sdx_datamodel.constants import Constants, DomainStatus, MongoCollections
2✔
6

7
from sdx_controller.handlers.connection_handler import (
2✔
8
    ConnectionHandler,
9
    connection_state_machine,
10
)
11
from sdx_controller.utils.parse_helper import ParseHelper
2✔
12

13
logger = logging.getLogger(__name__)
2✔
14

15

16
class LcMessageHandler:
2✔
17
    def __init__(self, db_instance, te_manager):
2✔
18
        self.db_instance = db_instance
2✔
19
        self.te_manager = te_manager
2✔
20
        self.parse_helper = ParseHelper()
2✔
21
        self.connection_handler = ConnectionHandler(db_instance)
2✔
22

23
    def process_lc_json_msg(
2✔
24
        self,
25
        msg,
26
        latest_topo,
27
        domain_dict,
28
    ):
29
        logger.info("MQ received message:" + str(msg))
×
30
        msg_json = json.loads(msg)
×
31

32
        if msg_json.get("msg_type") and msg_json["msg_type"] == "oxp_conn_response":
×
33
            logger.info("Received OXP connection response.")
×
34
            service_id = msg_json.get("service_id")
×
35

36
            if not service_id:
×
37
                return
×
38

39
            connection = self.db_instance.get_value_from_db(
×
40
                MongoCollections.CONNECTIONS, service_id
41
            )
42

43
            if not connection:
×
44
                return
×
45

46
            breakdown = self.db_instance.get_value_from_db(
×
47
                MongoCollections.BREAKDOWNS, service_id
48
            )
49
            if not breakdown:
×
50
                logger.info(f"Could not find breakdown for {service_id}")
×
51
                return None
×
52

53
            oxp_number = len(breakdown)
×
54
            oxp_success_count = connection.get("oxp_success_count", 0)
×
55
            lc_domain = msg_json.get("lc_domain")
×
56
            oxp_response_code = msg_json.get("oxp_response_code")
×
57
            oxp_response_msg = msg_json.get("oxp_response")
×
58
            oxp_response = connection.get("oxp_response")
×
59
            if not oxp_response:
×
60
                oxp_response = {}
×
61
            oxp_response[lc_domain] = (oxp_response_code, oxp_response_msg)
×
62
            connection["oxp_response"] = oxp_response
×
63

64
            if oxp_response_code // 100 == 2:
×
65
                if msg_json.get("operation") != "delete":
×
66
                    oxp_success_count += 1
×
67
                    connection["oxp_success_count"] = oxp_success_count
×
68
                    if oxp_success_count == oxp_number:
×
69
                        if connection.get("status") and (
×
70
                            connection.get("status")
71
                            == str(ConnectionStateMachine.State.RECOVERING)
72
                        ):
73
                            connection, _ = connection_state_machine(
×
74
                                connection,
75
                                ConnectionStateMachine.State.UNDER_PROVISIONING,
76
                            )
77
                        connection, _ = connection_state_machine(
×
78
                            connection, ConnectionStateMachine.State.UP
79
                        )
80
            else:
81
                if connection.get("status") and (
×
82
                    connection.get("status")
83
                    == str(ConnectionStateMachine.State.RECOVERING)
84
                ):
85
                    connection, _ = connection_state_machine(
×
86
                        connection, ConnectionStateMachine.State.ERROR
87
                    )
88
                elif (
×
89
                    connection.get("status")
90
                    and connection.get("status")
91
                    != str(ConnectionStateMachine.State.DOWN)
92
                    and connection.get("status")
93
                    != str(ConnectionStateMachine.State.ERROR)
94
                ):
95
                    connection, _ = connection_state_machine(
×
96
                        connection, ConnectionStateMachine.State.DOWN
97
                    )
98

99
            # ToDo: eg: if 3 oxps in the breakdowns: (1) all up: up (2) parital down: remove_connection()
100
            # release successful oxp circuits if some are down: remove_connection() (3) count the responses
101
            # to finalize the status of the connection.
102
            self.db_instance.add_key_value_pair_to_db(
×
103
                MongoCollections.CONNECTIONS,
104
                service_id,
105
                connection,
106
            )
107
            logger.info("Connection updated: " + service_id)
×
108
            return
×
109

110
        # topology message RPC from OXP: no exchange name is defined.
111
        msg_id = msg_json["id"]
×
112
        msg_version = msg_json["version"]
×
113

114
        domain_name = self.parse_helper.find_domain_name(msg_id, ":")
×
115
        msg_json["domain_name"] = domain_name
×
116

117
        db_msg_id = str(msg_id) + "-" + str(msg_version)
×
118
        # add message to db
119
        self.db_instance.add_key_value_pair_to_db(
×
120
            MongoCollections.TOPOLOGIES, db_msg_id, msg_json
121
        )
122
        logger.info("Save to database complete.")
×
123
        logger.info("message ID:" + str(db_msg_id))
×
124

125
        # Update existing topology
NEW
126
        if domain_name in domain_dict:
×
127
            logger.info("Updating topo")
×
128
            logger.debug(msg_json)
×
129
            (
×
130
                removed_nodes,
131
                added_nodes,
132
                removed_links,
133
                added_links,
134
                uni_ports_up_to_down,
135
                uni_ports_down_to_up,
136
            ) = self.te_manager.update_topology(msg_json)
137
            logger.info("Updating topology in TE manager")
×
138
            if removed_links and len(removed_links) > 0:
×
139
                logger.info("Processing removed link.")
×
140
                self.connection_handler.handle_link_removal(
×
141
                    self.te_manager, removed_links
142
                )
143
            # failed_links = self.te_manager.get_failed_links()
144
            # if failed_links:
145
            #    logger.info("Processing link failure.")
146
            #    self.connection_handler.handle_link_failure(
147
            #        self.te_manager, failed_links
148
            #    )
149
            if uni_ports_up_to_down:
×
150
                logger.info(f"Processing uni ports up to down:{uni_ports_up_to_down}")
×
151
                self.connection_handler.handle_uni_ports_up_to_down(
×
152
                    uni_ports_up_to_down
153
                )
154
            if uni_ports_down_to_up:
×
155
                logger.info(f"Processing uni ports down to up:{uni_ports_down_to_up}")
×
156
                self.connection_handler.handle_uni_ports_down_to_up(
×
157
                    uni_ports_down_to_up
158
                )
159

160
        # Add new topology
161
        else:
NEW
162
            domain_dict[domain_name] = DomainStatus.UP
×
163
            self.db_instance.add_key_value_pair_to_db(
×
164
                MongoCollections.DOMAINS, Constants.DOMAIN_DICT, domain_dict
165
            )
166
            logger.info("Adding topology to TE manager")
×
167
            self.te_manager.add_topology(msg_json)
×
168

169
        # Save to database
170
        # ToDo: check if there is any change in topology update, if not, do not re-save to db.
171
        logger.info(f"Adding topology {domain_name} to db.")
×
172
        self.db_instance.add_key_value_pair_to_db(
×
173
            MongoCollections.TOPOLOGIES, msg_id, msg_json
174
        )
175

176
        latest_topo = self.te_manager.topology_manager.get_topology().to_dict()
×
177
        # use 'latest_topo' as PK to save latest topo to db
178
        self.db_instance.add_key_value_pair_to_db(
×
179
            MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
180
        )
181
        logger.info("Save to database complete.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc