• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 13484851317

23 Feb 2025 04:28PM UTC coverage: 56.559% (+1.2%) from 55.374%
13484851317

Pull #413

github

web-flow
Merge 90a01b968 into fd240b5f0
Pull Request #413: initial drop w/ comment

33 of 56 new or added lines in 10 files covered. (58.93%)

2 existing lines in 2 files now uncovered.

1121 of 1982 relevant lines covered (56.56%)

1.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.66
/sdx_controller/handlers/connection_handler.py
1
import json
2✔
2
import logging
2✔
3
import time
2✔
4
import traceback
2✔
5
from typing import Tuple
2✔
6

7
from sdx_datamodel.constants import Constants, MessageQueueNames, MongoCollections
2✔
8
from sdx_datamodel.parsing.exceptions import ServiceNotSupportedException
2✔
9
from sdx_pce.load_balancing.te_solver import TESolver
2✔
10
from sdx_pce.topology.temanager import TEManager
2✔
11
from sdx_pce.utils.exceptions import RequestValidationError, TEError
2✔
12

13
from sdx_controller.messaging.topic_queue_producer import TopicQueueProducer
2✔
14
from sdx_controller.models.simple_link import SimpleLink
2✔
15
from sdx_controller.utils.parse_helper import ParseHelper
2✔
16

17
logger = logging.getLogger(__name__)
2✔
18
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
19

20

21
class ConnectionHandler:
2✔
22
    def __init__(self, db_instance):
2✔
23
        self.db_instance = db_instance
2✔
24
        self.parse_helper = ParseHelper()
2✔
25

26
    def _send_breakdown_to_lc(self, breakdown, operation, connection_request):
2✔
27
        logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}")
2✔
28

29
        if breakdown is None:
2✔
30
            return "Could not break down the solution", 400
×
31

32
        link_connections_dict_json = (
2✔
33
            self.db_instance.read_from_db(
34
                MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
35
            )[Constants.LINK_CONNECTIONS_DICT]
36
            if self.db_instance.read_from_db(
37
                MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
38
            )
39
            else None
40
        )
41

42
        if link_connections_dict_json:
2✔
43
            link_connections_dict = json.loads(link_connections_dict_json)
2✔
44
        else:
45
            link_connections_dict = {}
2✔
46

47
        interdomain_a, interdomain_b = None, None
2✔
48
        for domain, link in breakdown.items():
2✔
49
            port_list = []
2✔
50
            for key in link.keys():
2✔
51
                if "uni_" in key and "port_id" in link[key]:
2✔
52
                    port_list.append(link[key]["port_id"])
2✔
53

54
            if port_list:
2✔
55
                simple_link = SimpleLink(port_list).to_string()
2✔
56

57
                if simple_link not in link_connections_dict:
2✔
58
                    link_connections_dict[simple_link] = []
2✔
59

60
                if (
2✔
61
                    operation == "post"
62
                    and connection_request not in link_connections_dict[simple_link]
63
                ):
64
                    link_connections_dict[simple_link].append(connection_request)
2✔
65

66
                if (
2✔
67
                    operation == "delete"
68
                    and connection_request in link_connections_dict[simple_link]
69
                ):
70
                    link_connections_dict[simple_link].remove(connection_request)
2✔
71

72
                self.db_instance.add_key_value_pair_to_db(
2✔
73
                    MongoCollections.LINKS,
74
                    Constants.LINK_CONNECTIONS_DICT,
75
                    json.dumps(link_connections_dict),
76
                )
77

78
            if interdomain_a:
2✔
79
                interdomain_b = link.get("uni_a", {}).get("port_id")
2✔
80
            else:
81
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
82

83
            if interdomain_a and interdomain_b:
2✔
84
                simple_link = SimpleLink([interdomain_a, interdomain_b]).to_string()
2✔
85

86
                if simple_link not in link_connections_dict:
2✔
87
                    link_connections_dict[simple_link] = []
2✔
88

89
                if (
2✔
90
                    operation == "post"
91
                    and connection_request not in link_connections_dict[simple_link]
92
                ):
93
                    link_connections_dict[simple_link].append(connection_request)
2✔
94

95
                if (
2✔
96
                    operation == "delete"
97
                    and connection_request in link_connections_dict[simple_link]
98
                ):
99
                    link_connections_dict[simple_link].remove(connection_request)
2✔
100

101
                self.db_instance.add_key_value_pair_to_db(
2✔
102
                    MongoCollections.LINKS,
103
                    Constants.LINK_CONNECTIONS_DICT,
104
                    json.dumps(link_connections_dict),
105
                )
106

107
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
108

109
            logger.debug(f"Attempting to publish domain: {domain}, link: {link}")
2✔
110

111
            # From "urn:ogf:network:sdx:topology:amlight.net", attempt to
112
            # extract a string like "amlight".
113
            domain_name = self.parse_helper.find_domain_name(domain, ":") or f"{domain}"
2✔
114
            exchange_name = MessageQueueNames.CONNECTIONS
2✔
115

116
            logger.debug(
2✔
117
                f"Doing '{operation}' operation for '{link}' with exchange_name: {exchange_name}, "
118
                f"routing_key: {domain_name}"
119
            )
120
            mq_link = {
2✔
121
                "operation": operation,
122
                "service_id": connection_request.get("id"),
123
                "link": link,
124
            }
125
            producer = TopicQueueProducer(
2✔
126
                timeout=5, exchange_name=exchange_name, routing_key=domain_name
127
            )
128
            producer.call(json.dumps(mq_link))
2✔
129
            producer.stop_keep_alive()
2✔
130

131
        # We will get to this point only if all the previous steps
132
        # leading up to this point were successful.
133
        return "Connection published", 200
2✔
134

135
    def place_connection(
2✔
136
        self, te_manager: TEManager, connection_request: dict
137
    ) -> Tuple[str, int]:
138
        """
139
        Do the actual work of creating a connection.
140

141
        This method will call pce library to generate a breakdown
142
        across relevant domains, and then send individual connection
143
        requests to each of those domains.
144

145
        Note that we can return early if things fail.  Return value is
146
        a tuple of the form (reason, HTTP code).
147
        """
148
        # for num, val in enumerate(te_manager.get_topology_map().values()):
149
        #     logger.debug(f"TE topology #{num}: {val}")
150

151
        graph = te_manager.generate_graph_te()
2✔
152
        if graph is None:
2✔
153
            return "No SDX topology found", 424
×
154
        try:
2✔
155
            traffic_matrix = te_manager.generate_traffic_matrix(
2✔
156
                connection_request=connection_request
157
            )
158
        except RequestValidationError as request_err:
2✔
159
            err = traceback.format_exc().replace("\n", ", ")
2✔
160
            logger.error(
2✔
161
                f"Error when parsing and validating request: {request_err} - {err}"
162
            )
163
            return f"Error: {request_err}", request_err.request_code
2✔
164
        except ServiceNotSupportedException as service_err:
×
165
            err = traceback.format_exc().replace("\n", ", ")
×
166
            logger.error(
×
167
                f"Error when parsing and validating request: {service_err} - {err}"
168
            )
169
            return f"Error: {service_err}", 402
×
170

171
        if traffic_matrix is None:
2✔
172
            return (
2✔
173
                "Request does not have a valid JSON or body is incomplete/incorrect",
174
                400,
175
            )
176

177
        logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'")
2✔
178
        try:
2✔
179
            conn = te_manager.requests_connectivity(traffic_matrix)
2✔
180
            if conn is False:
2✔
181
                logger.error(f"Graph connectivity: {conn}")
×
182
                raise TEError("No path is available, the graph is not connected", 412)
183
        except TEError as te_err:
×
184
            return f"PCE error: {te_err}", te_err.te_code
×
185

186
        solver = TESolver(graph, traffic_matrix)
2✔
187
        solution = solver.solve()
2✔
188
        logger.debug(f"TESolver result: {solution}")
2✔
189

190
        if solution is None or solution.connection_map is None:
2✔
191
            return "Could not solve the request", 410
×
192

193
        try:
2✔
194
            breakdown = te_manager.generate_connection_breakdown(
2✔
195
                solution, connection_request
196
            )
197
            self.db_instance.add_key_value_pair_to_db(
2✔
198
                MongoCollections.BREAKDOWNS, connection_request["id"], breakdown
199
            )
200
            status, code = self._send_breakdown_to_lc(
2✔
201
                breakdown, "post", connection_request
202
            )
203
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
204
            return status, code
2✔
205
        except TEError as te_err:
×
206
            # We could probably return te_err.te_code instead of 400,
207
            # but I don't think PCE should use HTTP error codes,
208
            # because that violates abstraction boundaries.
209
            return f"PCE error: {te_err}", te_err.te_code
×
210
        except Exception as e:
×
211
            err = traceback.format_exc().replace("\n", ", ")
×
212
            logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
×
213
            return f"Error: {e}", 410
×
214

215
    def archive_connection(self, service_id) -> None:
2✔
216
        connection_request = self.db_instance.read_from_db(
2✔
217
            MongoCollections.CONNECTIONS, service_id
218
        )
219
        if not connection_request:
2✔
220
            return
×
221

222
        connection_request_str = connection_request[service_id]
2✔
223
        self.db_instance.delete_one_entry(MongoCollections.CONNECTIONS, service_id)
2✔
224

225
        historical_connections = self.db_instance.read_from_db(
2✔
226
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
227
        )
228
        # Current timestamp in seconds
229
        timestamp = int(time.time())
2✔
230

231
        if historical_connections:
2✔
232
            historical_connections_list = historical_connections[service_id]
×
233
            historical_connections_list.append(
×
234
                json.dumps({timestamp: json.loads(connection_request_str)})
235
            )
236
            self.db_instance.add_key_value_pair_to_db(
×
237
                MongoCollections.HISTORICAL_CONNECTIONS,
238
                service_id,
239
                historical_connections_list,
240
            )
241
        else:
242
            self.db_instance.add_key_value_pair_to_db(
2✔
243
                MongoCollections.HISTORICAL_CONNECTIONS,
244
                service_id,
245
                [json.dumps({timestamp: json.loads(connection_request_str)})],
246
            )
247
        logger.debug(f"Archived connection: {service_id}")
2✔
248

249
    def remove_connection(self, te_manager, service_id) -> Tuple[str, int]:
2✔
250
        te_manager.delete_connection(service_id)
2✔
251
        connection_request = self.db_instance.read_from_db(
2✔
252
            MongoCollections.CONNECTIONS, service_id
253
        )
254
        if not connection_request:
2✔
255
            return "Did not find connection request, cannot remove connection", 404
×
256

257
        connection_request = connection_request[service_id]
2✔
258

259
        breakdown = self.db_instance.read_from_db(
2✔
260
            MongoCollections.BREAKDOWNS, service_id
261
        )
262
        if not breakdown:
2✔
263
            return "Did not find breakdown, cannot remove connection", 404
×
264
        breakdown = breakdown[service_id]
2✔
265

266
        try:
2✔
267
            status, code = self._send_breakdown_to_lc(
2✔
268
                breakdown, "delete", json.loads(connection_request)
269
            )
270
            self.db_instance.delete_one_entry(MongoCollections.BREAKDOWNS, service_id)
2✔
271
            self.archive_connection(service_id)
2✔
272
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
273
            return status, code
2✔
274
        except Exception as e:
×
275
            logger.debug(f"Error when removing breakdown: {e}")
×
276
            return f"Error when removing breakdown: {e}", 400
×
277

278
    def handle_link_failure(self, te_manager, failed_links):
2✔
NEW
279
        logger.debug("Handling connections that contain failed link.")
×
280
        link_connections_dict_str = self.db_instance.read_from_db(
×
281
            MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
282
        )
283

284
        if (
×
285
            not link_connections_dict_str
286
            or not link_connections_dict_str[Constants.LINK_CONNECTIONS_DICT]
287
        ):
288
            logger.debug("No connection has been placed yet.")
×
289
            return
×
290

291
        link_connections_dict = json.loads(
×
292
            link_connections_dict_str[Constants.LINK_CONNECTIONS_DICT]
293
        )
294

295
        for link in failed_links:
×
296
            logger.info(f"Handling link failure on {link['id']} ({link['ports']})")
×
297
            port_list = []
×
298
            if "ports" not in link:
×
299
                continue
×
300
            for port in link["ports"]:
×
301
                port_id = port if isinstance(port, str) else port.get("id")
×
302
                if not port_id:
×
303
                    continue
×
304
                port_list.append(port_id)
×
305

306
            simple_link = SimpleLink(port_list).to_string()
×
307

308
            if simple_link in link_connections_dict:
×
309
                logger.debug("Found failed link record!")
×
310
                connections = link_connections_dict[simple_link]
×
311
                for index, connection in enumerate(connections):
×
312
                    logger.info(
×
313
                        f"Connection {connection['id']} affected by link {link['id']}"
314
                    )
315
                    if "id" not in connection:
×
316
                        continue
×
317

318
                    try:
×
319
                        self.remove_connection(te_manager, connection["id"])
×
320
                    except Exception as err:
×
321
                        logger.info(
×
322
                            f"Encountered error when deleting connection: {err}"
323
                        )
324
                        continue
×
325

326
                    del link_connections_dict[simple_link][index]
×
327
                    logger.debug("Removed connection:")
×
328
                    logger.debug(connection)
×
329
                    _reason, code = self.place_connection(te_manager, connection)
×
330
                    if code // 100 == 2:
×
331
                        self.db_instance.add_key_value_pair_to_db(
×
332
                            MongoCollections.CONNECTIONS,
333
                            connection["id"],
334
                            json.dumps(connection),
335
                        )
336

337
    def get_archived_connections(self, service_id: str):
2✔
338
        historical_connections = self.db_instance.read_from_db(
×
339
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
340
        )
341
        if not historical_connections:
×
342
            return None
×
343
        return historical_connections[service_id]
×
344

345

346
def get_connection_status(db, service_id: str):
2✔
347
    """
348
    Form a response to `GET /l2vpn/1.0/{service_id}`.
349
    """
350
    assert db is not None
2✔
351
    assert service_id is not None
2✔
352

353
    breakdown = db.read_from_db(MongoCollections.BREAKDOWNS, service_id)
2✔
354
    if not breakdown:
2✔
355
        logger.info(f"Could not find breakdown for {service_id}")
2✔
356
        return None
2✔
357

358
    logger.info(f"breakdown for {service_id}: {breakdown}")
2✔
359

360
    # The breakdown we read from DB is in this shape:
361
    #
362
    # {
363
    #     "_id": ObjectId("66ec71770c7022eb0922f41a"),
364
    #     "5b7df397-2269-489b-8e03-f256461265a0": {
365
    #         "urn:sdx:topology:amlight.net": {
366
    #             "name": "AMLIGHT_vlan_1000_10001",
367
    #             "dynamic_backup_path": True,
368
    #             "uni_a": {
369
    #                 "tag": {"value": 1000, "tag_type": 1},
370
    #                 "port_id": "urn:sdx:port:amlight.net:A1:1",
371
    #             },
372
    #             "uni_z": {
373
    #                 "tag": {"value": 10001, "tag_type": 1},
374
    #                 "port_id": "urn:sdx:port:amlight.net:B1:3",
375
    #             },
376
    #         }
377
    #     },
378
    # }
379
    #
380
    # We need to shape that into this form, at a minimum:
381
    #
382
    # {
383
    #     "c73da8e1-5d03-4620-a1db-7cdf23e8978c": {
384
    #         "service_id": "c73da8e1-5d03-4620-a1db-7cdf23e8978c",
385
    #         "name": "new-connection",
386
    #         "endpoints": [
387
    #          {
388
    #             "port_id": "urn:sdx:port:amlight.net:A1:1",
389
    #             "vlan": "150"
390
    #          },
391
    #          {
392
    #             "port_id": "urn:sdx:port:amlight:B1:1",
393
    #             "vlan": "300"}
394
    #         ],
395
    #     }
396
    # }
397
    #
398
    # See https://sdx-docs.readthedocs.io/en/latest/specs/provisioning-api-1.0.html#request-format-2
399
    #
400

401
    domains = breakdown.get(service_id)
2✔
402
    logger.info(f"domains for {service_id}: {domains.keys()}")
2✔
403

404
    # Find the name and description from the original connection
405
    # request for this service_id.
406
    name = "unknown"
2✔
407
    description = "unknown"
2✔
408
    qos_metrics = {}
2✔
409
    scheduling = {}
2✔
410
    notifications = {}
2✔
411

412
    endpoints = list()
2✔
413
    request_endpoints = []
2✔
414
    response_endpoints = []
2✔
415
    request_uni_a_id = None
2✔
416
    request_uni_z_id = None
2✔
417

418
    request = db.read_from_db(MongoCollections.CONNECTIONS, service_id)
2✔
419
    if not request:
2✔
420
        logger.error(f"Can't find a connection request for {service_id}")
×
421
        # TODO: we're in a strange state here. Should we panic?
422
    else:
423
        logger.info(f"Found request for {service_id}: {request}")
2✔
424
        # We seem to have saved the original request in the form of a
425
        # string into the DB, not a record.
426
        request_dict = json.loads(request.get(service_id))
2✔
427
        name = request_dict.get("name")
2✔
428
        description = request_dict.get("description")
2✔
429
        qos_metrics = request_dict.get("qos_metrics")
2✔
430
        scheduling = request_dict.get("scheduling")
2✔
431
        notifications = request_dict.get("notifications")
2✔
432
        oxp_response_code = request_dict.get("oxp_response_code")
2✔
433
        oxp_response = request_dict.get("oxp_response")
2✔
434
        if request_dict.get("endpoints") is not None:  # spec version 2.0.0
2✔
435
            request_endpoints = request_dict.get("endpoints")
2✔
436
            request_uni_a = request_endpoints[0]
2✔
437
            request_uni_a_id = request_uni_a.get("port_id")
2✔
438
            if request_uni_a_id is None:
2✔
439
                request_uni_a_id = request_uni_a.get("id")
2✔
440
            request_uni_z = request_endpoints[1]
2✔
441
            request_uni_z_id = request_uni_z.get("port_id")
2✔
442
            if request_uni_z_id is None:
2✔
443
                request_uni_z_id = request_uni_z.get("id")
2✔
444
        else:  # spec version 1.0.0
445
            request_uni_a = request_dict.get("ingress_port")
2✔
446
            request_uni_a_id = request_uni_a.get("id")
2✔
447
            request_uni_z = request_dict.get("egress_port")
2✔
448
            request_uni_z_id = request_uni_z.get("id")
2✔
449

450
    response = {}
2✔
451

452
    for domain, breakdown in domains.items():
2✔
453
        uni_a_port = breakdown.get("uni_a").get("port_id")
2✔
454
        uni_a_vlan = breakdown.get("uni_a").get("tag").get("value")
2✔
455

456
        endpoint_a = {
2✔
457
            "port_id": uni_a_port,
458
            "vlan": str(uni_a_vlan),
459
        }
460

461
        endpoints.append(endpoint_a)
2✔
462

463
        if request_uni_a_id == uni_a_port:
2✔
464
            (
2✔
465
                response_endpoints.append(endpoint_a)
466
                if endpoint_a not in response_endpoints
467
                else None
468
            )
469
        if request_uni_z_id == uni_a_port:
2✔
470
            (
×
471
                response_endpoints.append(endpoint_a)
472
                if endpoint_a not in response_endpoints
473
                else None
474
            )
475

476
        uni_z_port = breakdown.get("uni_z").get("port_id")
2✔
477
        uni_z_vlan = breakdown.get("uni_z").get("tag").get("value")
2✔
478

479
        endpoint_z = {
2✔
480
            "port_id": uni_z_port,
481
            "vlan": str(uni_z_vlan),
482
        }
483

484
        endpoints.append(endpoint_z)
2✔
485

486
        if request_uni_a_id == uni_z_port:
2✔
487
            (
×
488
                response_endpoints.append(endpoint_z)
489
                if endpoint_z not in response_endpoints
490
                else None
491
            )
492
        if request_uni_z_id == uni_z_port:
2✔
493
            (
2✔
494
                response_endpoints.append(endpoint_z)
495
                if endpoint_z not in response_endpoints
496
                else None
497
            )
498
        print(
2✔
499
            f"endpoints info: {request_uni_a_id}, {request_uni_z_id}, {uni_a_port}, {uni_z_port}"
500
        )
501

502
    # TODO: we're missing many of the attributes in the response here
503
    # which have been specified in the provisioning spec, such as:
504
    # name, description, qos_metrics, notifications, ownership,
505
    # creation_date, archived_date, status, state, counters_location,
506
    # last_modified, current_path, oxp_service_ids.  Implementing each
507
    # of them would be worth a separate ticket each, so we'll just
508
    # make do with this minimal response for now.
509
    response[service_id] = {
2✔
510
        "service_id": service_id,
511
        "name": name,
512
        "description": description,
513
        "endpoints": response_endpoints,
514
        "current_path": endpoints,
515
        "archived_date": 0,
516
    }
517
    if qos_metrics:
2✔
518
        response[service_id]["qos_metrics"] = qos_metrics
2✔
519

520
    if scheduling:
2✔
521
        response[service_id]["scheduling"] = scheduling
×
522

523
    if notifications:
2✔
524
        response[service_id]["notifications"] = notifications
2✔
525

526
    if oxp_response_code:
2✔
527
        response[service_id]["oxp_response_code"] = oxp_response_code
×
528

529
    if oxp_response:
2✔
530
        response[service_id]["oxp_response"] = oxp_response
×
531

532
    logger.info(f"Formed a response: {response}")
2✔
533

534
    return response
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc