• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 13484851317

23 Feb 2025 04:28PM UTC coverage: 56.559% (+1.2%) from 55.374%
13484851317

Pull #413

github

web-flow
Merge 90a01b968 into fd240b5f0
Pull Request #413: initial drop w/ comment

33 of 56 new or added lines in 10 files covered. (58.93%)

2 existing lines in 2 files now uncovered.

1121 of 1982 relevant lines covered (56.56%)

1.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.4
/sdx_controller/handlers/lc_message_handler.py
1
import json
2✔
2
import logging
2✔
3

4
from sdx_datamodel.constants import Constants, MongoCollections
2✔
5

6
from sdx_controller.handlers.connection_handler import ConnectionHandler
2✔
7
from sdx_controller.utils.parse_helper import ParseHelper
2✔
8

9
logger = logging.getLogger(__name__)
2✔
10

11

12
class LcMessageHandler:
2✔
13
    def __init__(self, db_instance, te_manager):
2✔
14
        self.db_instance = db_instance
2✔
15
        self.te_manager = te_manager
2✔
16
        self.parse_helper = ParseHelper()
2✔
17
        self.connection_handler = ConnectionHandler(db_instance)
2✔
18

19
    def process_lc_json_msg(
2✔
20
        self,
21
        msg,
22
        latest_topo,
23
        domain_list,
24
    ):
25
        logger.info("MQ received message:" + str(msg))
×
26
        msg_json = json.loads(msg)
×
27

28
        if msg_json.get("msg_type") and msg_json["msg_type"] == "oxp_conn_response":
×
29
            logger.info("Received OXP connection response.")
×
30
            service_id = msg_json.get("service_id")
×
31

32
            if not service_id:
×
33
                return
×
34

NEW
35
            connection = self.db_instance.read_from_db(
×
36
                MongoCollections.CONNECTIONS, service_id
37
            )
38

39
            if not connection:
×
40
                return
×
41

42
            connection_json = json.loads(connection[service_id])
×
43
            oxp_response_code = msg_json.get("oxp_response_code")
×
44
            connection_json["oxp_response_code"] = oxp_response_code
×
45
            connection_json["oxp_response"] = msg_json.get("oxp_response")
×
46

47
            if oxp_response_code // 100 != 2:
×
48
                connection_json["status"] = "down"
×
49
            elif not connection_json.get("status"):
×
50
                connection_json["status"] = "up"
×
51

52
            self.db_instance.add_key_value_pair_to_db(
×
53
                MongoCollections.CONNECTIONS,
54
                service_id,
55
                json.dumps(connection_json),
56
            )
57
            logger.info("Connection updated: " + service_id)
×
58
            return
×
59

60
        msg_id = msg_json["id"]
×
61
        msg_version = msg_json["version"]
×
62

63
        domain_name = self.parse_helper.find_domain_name(msg_id, ":")
×
64
        msg_json["domain_name"] = domain_name
×
65

66
        db_msg_id = str(msg_id) + "-" + str(msg_version)
×
67
        # add message to db
NEW
68
        self.db_instance.add_key_value_pair_to_db(
×
69
            MongoCollections.TOPOLOGIES, db_msg_id, msg
70
        )
71
        logger.info("Save to database complete.")
×
72
        logger.info("message ID:" + str(db_msg_id))
×
73

74
        # Update existing topology
75
        if domain_name in domain_list:
×
76
            logger.info("Updating topo")
×
77
            logger.debug(msg_json)
×
NEW
78
            (removed_nodes, added_nodes, removed_links, added_links) = (
×
79
                self.te_manager.update_topology(msg_json)
80
            )
81
            logger.info("Updating topology in TE manager")
×
NEW
82
            if len(removed_links) > 0:
×
NEW
83
                logger.info("Processing removed link.")
×
NEW
84
                self.connection_handler.handle_link_failure(
×
85
                    self.te_manager, removed_links
86
                )
87
            failed_links = self.te_manager.get_failed_links()
×
88
            if failed_links:
×
89
                logger.info("Processing link failure.")
×
90
                self.connection_handler.handle_link_failure(
×
91
                    self.te_manager, failed_links
92
                )
93
            # update topology in DB:
NEW
94
            self.db_instance.add_key_value_pair_to_db(
×
95
                MongoCollections.TOPOLOGIES, domain_name, json.dumps(msg_json)
96
            )
97
            # use 'latest_topo' as PK to save latest topo to db
NEW
98
            latest_topo = json.dumps(
×
99
                self.te_manager.topology_manager.get_topology().to_dict()
100
            )
NEW
101
            self.db_instance.add_key_value_pair_to_db(
×
102
                MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
103
            )
NEW
104
            logger.info("Save to database complete.")
×
105
        # Add new topology
106
        else:
107
            domain_list.append(domain_name)
×
108
            self.db_instance.add_key_value_pair_to_db(
×
109
                MongoCollections.DOMAINS, Constants.DOMAIN_LIST, domain_list
110
            )
111
            logger.info("Adding topology to TE manager")
×
112
            self.te_manager.add_topology(msg_json)
×
113

114
        logger.info(f"Adding topology {domain_name} to db.")
×
115
        self.db_instance.add_key_value_pair_to_db(
×
116
            MongoCollections.TOPOLOGIES, domain_name, json.dumps(msg_json)
117
        )
118

119
        # TODO: use TEManager API directly; but TEManager does not
120
        # expose a `get_topology()` method yet.
121
        latest_topo = json.dumps(
×
122
            self.te_manager.topology_manager.get_topology().to_dict()
123
        )
124
        # use 'latest_topo' as PK to save latest topo to db
125
        self.db_instance.add_key_value_pair_to_db(
×
126
            MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
127
        )
128
        logger.info("Save to database complete.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc