• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 14672831121

25 Apr 2025 08:21PM UTC coverage: 56.078% (-0.07%) from 56.145%
14672831121

Pull #453

github

web-flow
Merge 6b07ad640 into 03aa84af5
Pull Request #453: Assign oxp_success_count to 0 only if it does not exist

31 of 53 new or added lines in 4 files covered. (58.49%)

1 existing line in 1 file now uncovered.

1181 of 2106 relevant lines covered (56.08%)

1.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.07
/sdx_controller/handlers/lc_message_handler.py
1
import json
2✔
2
import logging
2✔
3

4
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
5
from sdx_datamodel.constants import Constants, MongoCollections
2✔
6

7
from sdx_controller.handlers.connection_handler import (
2✔
8
    ConnectionHandler,
9
    connection_state_machine,
10
)
11
from sdx_controller.utils.parse_helper import ParseHelper
2✔
12

13
logger = logging.getLogger(__name__)
2✔
14

15

16
class LcMessageHandler:
2✔
17
    def __init__(self, db_instance, te_manager):
2✔
18
        self.db_instance = db_instance
2✔
19
        self.te_manager = te_manager
2✔
20
        self.parse_helper = ParseHelper()
2✔
21
        self.connection_handler = ConnectionHandler(db_instance)
2✔
22

23
    def process_lc_json_msg(
2✔
24
        self,
25
        msg,
26
        latest_topo,
27
        domain_list,
28
    ):
29
        logger.info("MQ received message:" + str(msg))
×
30
        msg_json = json.loads(msg)
×
31

32
        if msg_json.get("msg_type") and msg_json["msg_type"] == "oxp_conn_response":
×
33
            logger.info("Received OXP connection response.")
×
34
            service_id = msg_json.get("service_id")
×
35

36
            if not service_id:
×
37
                return
×
38

39
            connection = self.db_instance.read_from_db(
×
40
                MongoCollections.CONNECTIONS, service_id
41
            )
42

43
            if not connection:
×
44
                return
×
45

46
            breakdown = self.db_instance.read_from_db(
×
47
                MongoCollections.BREAKDOWNS, service_id
48
            )
49
            if not breakdown:
×
50
                logger.info(f"Could not find breakdown for {service_id}")
×
51
                return None
×
52

53
            domains = breakdown.get(service_id)
×
54
            oxp_number = len(domains)
×
55

NEW
56
            connection_json = connection[service_id]
×
57
            oxp_success_count = connection_json.get("oxp_success_count", 0)
×
58
            lc_domain = msg_json.get("lc_domain")
×
59
            oxp_response_code = msg_json.get("oxp_response_code")
×
60
            oxp_response_msg = msg_json.get("oxp_response")
×
61
            oxp_response = connection_json.get("oxp_response")
×
62
            if not oxp_response:
×
63
                oxp_response = {}
×
64
            oxp_response[lc_domain] = (oxp_response_code, oxp_response_msg)
×
65
            connection_json["oxp_response"] = oxp_response
×
66

67
            if oxp_response_code // 100 == 2:
×
68
                if msg_json.get("operation") != "delete":
×
69
                    oxp_success_count += 1
×
70
                    connection_json["oxp_success_count"] = oxp_success_count
×
71
                    if oxp_success_count == oxp_number:
×
72
                        connection_json, _ = connection_state_machine(
×
73
                            connection_json, ConnectionStateMachine.State.UP
74
                        )
75
            else:
76
                if connection_json.get("status") and (
×
77
                    connection_json.get("status")
78
                    == str(ConnectionStateMachine.State.RECOVERING)
79
                ):
80
                    connection_json, _ = connection_state_machine(
×
81
                        connection_json, ConnectionStateMachine.State.ERROR
82
                    )
83
                elif connection_json.get("status") and (
×
84
                    connection_json.get("status")
85
                    != str(ConnectionStateMachine.State.DOWN)
86
                ):
87
                    connection_json, _ = connection_state_machine(
×
88
                        connection_json, ConnectionStateMachine.State.DOWN
89
                    )
90

91
            # ToDo: eg: if 3 oxps in the breakdowns: (1) all up: up (2) parital down: remove_connection()
92
            # release successful oxp circuits if some are down: remove_connection() (3) count the responses
93
            # to finalize the status of the connection.
94
            self.db_instance.add_key_value_pair_to_db(
×
95
                MongoCollections.CONNECTIONS,
96
                service_id,
97
                connection_json,
98
            )
99
            logger.info("Connection updated: " + service_id)
×
100
            return
×
101

102
        # topology message RPC from OXP: no exchange name is defined.
103
        msg_id = msg_json["id"]
×
104
        msg_version = msg_json["version"]
×
105

106
        domain_name = self.parse_helper.find_domain_name(msg_id, ":")
×
107
        msg_json["domain_name"] = domain_name
×
108

109
        db_msg_id = str(msg_id) + "-" + str(msg_version)
×
110
        # add message to db
111
        self.db_instance.add_key_value_pair_to_db(
×
112
            MongoCollections.TOPOLOGIES, db_msg_id, msg
113
        )
114
        logger.info("Save to database complete.")
×
115
        logger.info("message ID:" + str(db_msg_id))
×
116

117
        # Update existing topology
118
        if domain_name in domain_list:
×
119
            logger.info("Updating topo")
×
120
            logger.debug(msg_json)
×
121
            (
×
122
                removed_nodes,
123
                added_nodes,
124
                removed_links,
125
                added_links,
126
            ) = self.te_manager.update_topology(msg_json)
127
            logger.info("Updating topology in TE manager")
×
128
            if removed_links and len(removed_links) > 0:
×
129
                logger.info("Processing removed link.")
×
130
                self.connection_handler.handle_link_removal(
×
131
                    self.te_manager, removed_links
132
                )
133
            failed_links = self.te_manager.get_failed_links()
×
134
            if failed_links:
×
135
                logger.info("Processing link failure.")
×
136
                self.connection_handler.handle_link_failure(
×
137
                    self.te_manager, failed_links
138
                )
139

140
        # Add new topology
141
        else:
142
            domain_list.append(domain_name)
×
143
            self.db_instance.add_key_value_pair_to_db(
×
144
                MongoCollections.DOMAINS, Constants.DOMAIN_LIST, domain_list
145
            )
146
            logger.info("Adding topology to TE manager")
×
147
            self.te_manager.add_topology(msg_json)
×
148

149
        # Save to database
150
        # ToDo: check if there is any change in topology update, if not, do not re-save to db.
151
        logger.info(f"Adding topology {domain_name} to db.")
×
152
        self.db_instance.add_key_value_pair_to_db(
×
153
            MongoCollections.TOPOLOGIES, domain_name, json.dumps(msg_json)
154
        )
155

156
        latest_topo = json.dumps(
×
157
            self.te_manager.topology_manager.get_topology().to_dict()
158
        )
159
        # use 'latest_topo' as PK to save latest topo to db
160
        self.db_instance.add_key_value_pair_to_db(
×
161
            MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
162
        )
163
        logger.info("Save to database complete.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc