• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 17100863166

20 Aug 2025 02:07PM UTC coverage: 55.382% (-0.1%) from 55.53%
17100863166

Pull #477

github

web-flow
Merge c0f616f0d into 36b845ee6
Pull Request #477: Update to 2.0 API format

15 of 17 new or added lines in 2 files covered. (88.24%)

8 existing lines in 2 files now uncovered.

1204 of 2174 relevant lines covered (55.38%)

1.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.73
/sdx_controller/handlers/lc_message_handler.py
1
import json
2✔
2
import logging
2✔
3

4
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
5
from sdx_datamodel.constants import Constants, MongoCollections
2✔
6

7
from sdx_controller.handlers.connection_handler import (
2✔
8
    ConnectionHandler,
9
    connection_state_machine,
10
)
11
from sdx_controller.utils.parse_helper import ParseHelper
2✔
12

13
logger = logging.getLogger(__name__)
2✔
14

15

16
class LcMessageHandler:
2✔
17
    def __init__(self, db_instance, te_manager):
2✔
18
        self.db_instance = db_instance
2✔
19
        self.te_manager = te_manager
2✔
20
        self.parse_helper = ParseHelper()
2✔
21
        self.connection_handler = ConnectionHandler(db_instance)
2✔
22

23
    def process_lc_json_msg(
2✔
24
        self,
25
        msg,
26
        latest_topo,
27
        domain_list,
28
    ):
29
        logger.info("MQ received message:" + str(msg))
×
30
        msg_json = json.loads(msg)
×
31

32
        if msg_json.get("msg_type") and msg_json["msg_type"] == "oxp_conn_response":
×
33
            logger.info("Received OXP connection response.")
×
34
            service_id = msg_json.get("service_id")
×
35

36
            if not service_id:
×
37
                return
×
38

39
            connection = self.db_instance.get_value_from_db(
×
40
                MongoCollections.CONNECTIONS, service_id
41
            )
42

43
            if not connection:
×
44
                return
×
45

46
            breakdown = self.db_instance.get_value_from_db(
×
47
                MongoCollections.BREAKDOWNS, service_id
48
            )
49
            if not breakdown:
×
50
                logger.info(f"Could not find breakdown for {service_id}")
×
51
                return None
×
52

53
            oxp_number = len(breakdown)
×
54
            oxp_success_count = connection.get("oxp_success_count", 0)
×
55
            lc_domain = msg_json.get("lc_domain")
×
56
            oxp_response_code = msg_json.get("oxp_response_code")
×
57
            oxp_response_msg = msg_json.get("oxp_response")
×
58
            oxp_response = connection.get("oxp_response")
×
59
            if not oxp_response:
×
60
                oxp_response = {}
×
61
            oxp_response[lc_domain] = (oxp_response_code, oxp_response_msg)
×
62
            connection["oxp_response"] = oxp_response
×
63

64
            if oxp_response_code // 100 == 2:
×
65
                if msg_json.get("operation") != "delete":
×
66
                    oxp_success_count += 1
×
67
                    connection["oxp_success_count"] = oxp_success_count
×
68
                    if oxp_success_count == oxp_number:
×
69
                        if connection.get("status") and (
×
70
                            connection.get("status")
71
                            == str(ConnectionStateMachine.State.RECOVERING)
72
                        ):
73
                            connection, _ = connection_state_machine(
×
74
                                connection,
75
                                ConnectionStateMachine.State.UNDER_PROVISIONING,
76
                            )
77
                        connection, _ = connection_state_machine(
×
78
                            connection, ConnectionStateMachine.State.UP
79
                        )
80
            else:
81
                if connection.get("status") and (
×
82
                    connection.get("status")
83
                    == str(ConnectionStateMachine.State.RECOVERING)
84
                ):
85
                    connection, _ = connection_state_machine(
×
86
                        connection, ConnectionStateMachine.State.ERROR
87
                    )
88
                elif (
×
89
                    connection.get("status")
90
                    and connection.get("status")
91
                    != str(ConnectionStateMachine.State.DOWN)
92
                    and connection.get("status")
93
                    != str(ConnectionStateMachine.State.ERROR)
94
                ):
NEW
95
                    connection, _ = connection_state_machine(
×
96
                        connection, ConnectionStateMachine.State.MODIFYING
97
                    )
UNCOV
98
                    connection, _ = connection_state_machine(
×
99
                        connection, ConnectionStateMachine.State.DOWN
100
                    )
101

102
            # ToDo: eg: if 3 oxps in the breakdowns: (1) all up: up (2) parital down: remove_connection()
103
            # release successful oxp circuits if some are down: remove_connection() (3) count the responses
104
            # to finalize the status of the connection.
105
            self.db_instance.add_key_value_pair_to_db(
×
106
                MongoCollections.CONNECTIONS,
107
                service_id,
108
                connection,
109
            )
110
            logger.info("Connection updated: " + service_id)
×
111
            return
×
112

113
        # topology message RPC from OXP: no exchange name is defined.
114
        msg_id = msg_json["id"]
×
115
        msg_version = msg_json["version"]
×
116

117
        domain_name = self.parse_helper.find_domain_name(msg_id, ":")
×
118
        msg_json["domain_name"] = domain_name
×
119

120
        db_msg_id = str(msg_id) + "-" + str(msg_version)
×
121
        # add message to db
122
        self.db_instance.add_key_value_pair_to_db(
×
123
            MongoCollections.TOPOLOGIES, db_msg_id, msg_json
124
        )
125
        logger.info("Save to database complete.")
×
126
        logger.info("message ID:" + str(db_msg_id))
×
127

128
        # Update existing topology
129
        if domain_name in domain_list:
×
130
            logger.info("Updating topo")
×
131
            logger.debug(msg_json)
×
132
            (
×
133
                removed_nodes,
134
                added_nodes,
135
                removed_links,
136
                added_links,
137
                uni_ports_up_to_down,
138
                uni_ports_down_to_up,
139
            ) = self.te_manager.update_topology(msg_json)
140
            logger.info("Updating topology in TE manager")
×
141
            if removed_links and len(removed_links) > 0:
×
142
                logger.info("Processing removed link.")
×
143
                self.connection_handler.handle_link_removal(
×
144
                    self.te_manager, removed_links
145
                )
146
            failed_links = self.te_manager.get_failed_links()
×
147
            if failed_links:
×
148
                logger.info("Processing link failure.")
×
149
                self.connection_handler.handle_link_failure(
×
150
                    self.te_manager, failed_links
151
                )
152
            if uni_ports_up_to_down:
×
153
                logger.info("Processing uni ports up to down.")
×
154
                self.connection_handler.handle_uni_ports_up_to_down(
×
155
                    uni_ports_up_to_down
156
                )
157
            if uni_ports_down_to_up:
×
158
                logger.info("Processing uni ports down to up.")
×
159
                self.connection_handler.handle_uni_ports_down_to_up(
×
160
                    uni_ports_down_to_up
161
                )
162

163
        # Add new topology
164
        else:
165
            domain_list.append(domain_name)
×
166
            self.db_instance.add_key_value_pair_to_db(
×
167
                MongoCollections.DOMAINS, Constants.DOMAIN_LIST, domain_list
168
            )
169
            logger.info("Adding topology to TE manager")
×
170
            self.te_manager.add_topology(msg_json)
×
171

172
        # Save to database
173
        # ToDo: check if there is any change in topology update, if not, do not re-save to db.
174
        logger.info(f"Adding topology {domain_name} to db.")
×
175
        self.db_instance.add_key_value_pair_to_db(
×
176
            MongoCollections.TOPOLOGIES, msg_id, msg_json
177
        )
178

179
        latest_topo = self.te_manager.topology_manager.get_topology().to_dict()
×
180
        # use 'latest_topo' as PK to save latest topo to db
181
        self.db_instance.add_key_value_pair_to_db(
×
182
            MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
183
        )
184
        logger.info("Save to database complete.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc