• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 16630934709

30 Jul 2025 06:39PM UTC coverage: 54.65% (-0.9%) from 55.53%
16630934709

Pull #477

github

web-flow
Merge 62d1648e1 into 36b845ee6
Pull Request #477: Update to 2.0 API format

13 of 14 new or added lines in 1 file covered. (92.86%)

27 existing lines in 2 files now uncovered.

1187 of 2172 relevant lines covered (54.65%)

1.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.28
/sdx_controller/handlers/connection_handler.py
1
import json
2✔
2
import logging
2✔
3
import time
2✔
4
import traceback
2✔
5
from typing import Tuple
2✔
6

7
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
8
from sdx_datamodel.constants import Constants, MessageQueueNames, MongoCollections
2✔
9
from sdx_datamodel.parsing.exceptions import (
2✔
10
    AttributeNotSupportedException,
11
    ServiceNotSupportedException,
12
)
13
from sdx_pce.load_balancing.te_solver import TESolver
2✔
14
from sdx_pce.topology.temanager import TEManager
2✔
15
from sdx_pce.utils.exceptions import (
2✔
16
    RequestValidationError,
17
    SameSwitchRequestError,
18
    TEError,
19
)
20

21
from sdx_controller.messaging.topic_queue_producer import TopicQueueProducer
2✔
22
from sdx_controller.models.simple_link import SimpleLink
2✔
23
from sdx_controller.utils.parse_helper import ParseHelper
2✔
24

25
logger = logging.getLogger(__name__)
2✔
26
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
27

28

29
class ConnectionHandler:
2✔
30
    def __init__(self, db_instance):
2✔
31
        self.db_instance = db_instance
2✔
32
        self.parse_helper = ParseHelper()
2✔
33

34
    def _process_port(self, connection_service_id, port_id, operation):
2✔
35
        port_in_db = self.db_instance.read_from_db(MongoCollections.PORTS, port_id)
2✔
36

37
        if not port_in_db:
2✔
38
            port_in_db = {}
2✔
39

40
        if Constants.PORT_CONNECTIONS_DICT not in port_in_db:
2✔
41
            port_in_db[Constants.PORT_CONNECTIONS_DICT] = []
2✔
42

43
        if (
2✔
44
            operation == "post"
45
            and connection_service_id
46
            and connection_service_id not in port_in_db[Constants.PORT_CONNECTIONS_DICT]
47
        ):
48
            port_in_db[Constants.PORT_CONNECTIONS_DICT].append(connection_service_id)
2✔
49

50
        if (
2✔
51
            operation == "delete"
52
            and connection_service_id
53
            and connection_service_id in port_in_db[Constants.PORT_CONNECTIONS_DICT]
54
        ):
55
            port_in_db[Constants.PORT_CONNECTIONS_DICT].remove(connection_service_id)
×
56

57
        self.db_instance.add_key_value_pair_to_db(
2✔
58
            MongoCollections.PORTS, port_id, port_in_db
59
        )
60

61
    def _process_link_connection_dict(
2✔
62
        self, link_connections_dict, simple_link, connection_service_id, operation
63
    ):
64
        if simple_link not in link_connections_dict:
2✔
65
            link_connections_dict[simple_link] = []
2✔
66

67
        if (
2✔
68
            operation == "post"
69
            and connection_service_id
70
            and connection_service_id not in link_connections_dict[simple_link]
71
        ):
72
            link_connections_dict[simple_link].append(connection_service_id)
2✔
73

74
        if (
2✔
75
            operation == "delete"
76
            and connection_service_id
77
            and connection_service_id in link_connections_dict[simple_link]
78
        ):
UNCOV
79
            link_connections_dict[simple_link].remove(connection_service_id)
×
80

81
    def _send_breakdown_to_lc(self, breakdown, operation, connection_request):
2✔
82
        logger.debug(f"BREAKDOWN: {json.dumps(breakdown)}")
2✔
83

84
        if breakdown is None:
2✔
85
            return "Could not break down the solution", 400
×
86

87
        link_connections_dict_json = self.db_instance.get_value_from_db(
2✔
88
            MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
89
        )
90

91
        link_connections_dict = (
2✔
92
            json.loads(link_connections_dict_json) if link_connections_dict_json else {}
93
        )
94

95
        interdomain_a, interdomain_b = None, None
2✔
96
        connection_service_id = connection_request.get("id")
2✔
97

98
        for domain, link in breakdown.items():
2✔
99
            port_list = []
2✔
100
            link_with_new_format = {}
2✔
101
            for key in link.keys():
2✔
102
                if "uni_" in key and "port_id" in link[key]:
2✔
103
                    port_list.append(
2✔
104
                        {
105
                            "port_id": link[key]["port_id"],
106
                            "vlan_value": link[key].get("tag", {}).get("value"),
107
                        }
108
                    )
109

110
            if port_list:
2✔
111
                link_with_new_format["name"] = link.get("name", "")
2✔
112
                link_with_new_format["endpoints"] = []
2✔
113
                for port in port_list:
2✔
114
                    self._process_port(connection_service_id, port, operation)
2✔
115
                    if port.get("vlan_value"):
2✔
116
                        link_with_new_format["endpoints"].append(
2✔
117
                            {
118
                                "port_id": port.get("port_id"),
119
                                "vlan": port.get("vlan_value"),
120
                            }
121
                        )
122
                port_id_list = [port.get("port_id") for port in port_list]
2✔
123
                simple_link = SimpleLink(port_id_list).to_string()
2✔
124

125
            self._process_link_connection_dict(
2✔
126
                link_connections_dict, simple_link, connection_service_id, operation
127
            )
128

129
            if interdomain_a:
2✔
130
                interdomain_b = link.get("uni_a", {}).get("port_id")
2✔
131
            else:
132
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
133

134
            if interdomain_a and interdomain_b:
2✔
135
                simple_link = SimpleLink([interdomain_a, interdomain_b]).to_string()
2✔
136
                self._process_link_connection_dict(
2✔
137
                    link_connections_dict, simple_link, connection_service_id, operation
138
                )
139
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
140

141
            self.db_instance.add_key_value_pair_to_db(
2✔
142
                MongoCollections.LINKS,
143
                Constants.LINK_CONNECTIONS_DICT,
144
                json.dumps(link_connections_dict),
145
            )
146

147
            logger.debug(
2✔
148
                f"Attempting to publish domain: {domain}, link: {link_with_new_format}"
149
            )
150

151
            # From "urn:ogf:network:sdx:topology:amlight.net", attempt to
152
            # extract a string like "amlight".
153
            domain_name = self.parse_helper.find_domain_name(domain, ":") or f"{domain}"
2✔
154
            exchange_name = MessageQueueNames.CONNECTIONS
2✔
155

156
            logger.debug(
2✔
157
                f"Doing '{operation}' operation for '{link_with_new_format}' with exchange_name: {exchange_name}, "
158
                f"routing_key: {domain_name}"
159
            )
160
            mq_link = {
2✔
161
                "operation": operation,
162
                "service_id": connection_service_id,
163
                "link": link_with_new_format,
164
            }
165
            producer = TopicQueueProducer(
2✔
166
                timeout=5, exchange_name=exchange_name, routing_key=domain_name
167
            )
168
            producer.call(json.dumps(mq_link))
2✔
169
            producer.stop_keep_alive()
2✔
170

171
        # We will get to this point only if all the previous steps
172
        # leading up to this point were successful.
173
        return "Connection published", 201
2✔
174

175
    def place_connection(
2✔
176
        self, te_manager: TEManager, connection_request: dict
177
    ) -> Tuple[str, int]:
178
        """
179
        Do the actual work of creating a connection.
180

181
        This method will call pce library to generate a breakdown
182
        across relevant domains, and then send individual connection
183
        requests to each of those domains.
184

185
        Note that we can return early if things fail.  Return value is
186
        a tuple of the form (reason, HTTP code).
187
        """
188
        # for num, val in enumerate(te_manager.get_topology_map().values()):
189
        #     logger.debug(f"TE topology #{num}: {val}")
190

191
        graph = te_manager.generate_graph_te()
2✔
192
        if graph is None:
2✔
193
            return "No SDX topology found", 424
×
194
        try:
2✔
195
            traffic_matrix = te_manager.generate_traffic_matrix(
2✔
196
                connection_request=connection_request
197
            )
198
        except RequestValidationError as request_err:
2✔
199
            err = traceback.format_exc().replace("\n", ", ")
2✔
200
            logger.error(
2✔
201
                f"Error when parsing and validating request: {request_err} - {err}"
202
            )
203
            return f"Error: {request_err}", request_err.request_code
2✔
204
        except ServiceNotSupportedException as service_err:
2✔
205
            err = traceback.format_exc().replace("\n", ", ")
×
206
            logger.error(
×
207
                f"Error when parsing and validating request: {service_err} - {err}"
208
            )
209
            return f"Error: {service_err}", 402
×
210
        except AttributeNotSupportedException as attr_err:
2✔
211
            err = traceback.format_exc().replace("\n", ", ")
×
212
            logger.error(
×
213
                f"Error when parsing and validating request: {attr_err} - {err}"
214
            )
215
            return f"Error: {attr_err}", 422
×
216
        except SameSwitchRequestError as ctx:
2✔
217
            logger.debug(
2✔
218
                f"{str(ctx)},{ctx.request_id},{ctx.domain_id},{ctx.ingress_port},{ctx.egress_port}, {ctx.ingress_user_port_tag}, {ctx.egress_user_port_tag}"
219
            )
220
            try:
2✔
221
                breakdown = te_manager.generate_connection_breakdown_same_switch(
2✔
222
                    ctx.request_id,
223
                    ctx.domain_id,
224
                    ctx.ingress_port,
225
                    ctx.egress_port,
226
                    ctx.ingress_user_port_tag,
227
                    ctx.egress_user_port_tag,
228
                )
229
                self.db_instance.add_key_value_pair_to_db(
2✔
230
                    MongoCollections.BREAKDOWNS, connection_request["id"], breakdown
231
                )
232
                status, code = self._send_breakdown_to_lc(
2✔
233
                    breakdown, "post", connection_request
234
                )
235
                logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
236
                # update topology in DB with updated states (bandwidth and available vlan pool)
237
                topology_db_update(self.db_instance, te_manager)
2✔
238
                return status, code
2✔
239
            except TEError as te_err:
×
240
                # We could probably return te_err.te_code instead of 400,
241
                # but I don't think PCE should use HTTP error codes,
242
                # because that violates abstraction boundaries.
243
                return f"PCE error: {te_err}", te_err.te_code
×
244
            except Exception as e:
×
245
                err = traceback.format_exc().replace("\n", ", ")
×
246
                logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
×
247
                return f"Error: {e}", 410
×
248

249
        # General case: traffic_matrix is not None
250
        if traffic_matrix is None:
2✔
251
            return (
2✔
252
                "Request does not have a valid JSON or body is incomplete/incorrect",
253
                400,
254
            )
255

256
        logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'")
2✔
257
        try:
2✔
258
            conn = te_manager.requests_connectivity(traffic_matrix)
2✔
259
            if conn is False:
2✔
260
                logger.error(f"Graph connectivity: {conn}")
×
261
                raise TEError("No path is available, the graph is not connected", 412)
262
        except TEError as te_err:
×
263
            return f"PCE error: {te_err}", te_err.te_code
×
264

265
        solver = TESolver(graph, traffic_matrix)
2✔
266
        solution = solver.solve()
2✔
267
        logger.debug(f"TESolver result: {solution}")
2✔
268

269
        if solution is None or solution.connection_map is None:
2✔
270
            return "Could not solve the request", 410
×
271

272
        try:
2✔
273
            breakdown = te_manager.generate_connection_breakdown(
2✔
274
                solution, connection_request
275
            )
276
            self.db_instance.add_key_value_pair_to_db(
2✔
277
                MongoCollections.BREAKDOWNS, connection_request["id"], breakdown
278
            )
279
            status, code = self._send_breakdown_to_lc(
2✔
280
                breakdown, "post", connection_request
281
            )
282
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
283
            # update topology in DB with updated states (bandwidth and available vlan pool)
284
            topology_db_update(self.db_instance, te_manager)
2✔
285
            return status, code
2✔
286
        except TEError as te_err:
×
287
            # We could probably return te_err.te_code instead of 400,
288
            # but I don't think PCE should use HTTP error codes,
289
            # because that violates abstraction boundaries.
290
            return f"PCE error: {te_err}", te_err.te_code
×
291
        except Exception as e:
×
292
            err = traceback.format_exc().replace("\n", ", ")
×
293
            logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
×
294
            return f"Error: {e}", 410
×
295

296
    def archive_connection(self, service_id) -> None:
2✔
UNCOV
297
        connection_request = self.db_instance.get_value_from_db(
×
298
            MongoCollections.CONNECTIONS, service_id
299
        )
UNCOV
300
        if not connection_request:
×
301
            return
×
302

UNCOV
303
        connection_request = connection_request
×
UNCOV
304
        self.db_instance.delete_one_entry(MongoCollections.CONNECTIONS, service_id)
×
305

UNCOV
306
        historical_connections_list = self.db_instance.get_value_from_db(
×
307
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
308
        )
309
        # Current timestamp in seconds
UNCOV
310
        timestamp = int(time.time())
×
311

UNCOV
312
        if historical_connections_list:
×
313
            historical_connections_list.append({timestamp: connection_request})
×
314
            self.db_instance.add_key_value_pair_to_db(
×
315
                MongoCollections.HISTORICAL_CONNECTIONS,
316
                service_id,
317
                historical_connections_list,
318
            )
319
        else:
UNCOV
320
            self.db_instance.add_key_value_pair_to_db(
×
321
                MongoCollections.HISTORICAL_CONNECTIONS,
322
                service_id,
323
                [{timestamp: connection_request}],
324
            )
UNCOV
325
        logger.debug(f"Archived connection: {service_id}")
×
326

327
    def remove_connection(self, te_manager, service_id) -> Tuple[str, int]:
2✔
328
        te_manager.delete_connection(service_id)
2✔
329
        connection_request = self.db_instance.get_value_from_db(
2✔
330
            MongoCollections.CONNECTIONS, service_id
331
        )
332

333
        if not connection_request:
2✔
334
            return "Did not find connection request, cannot remove connection", 404
×
335

336
        oxp_response = connection_request.get("oxp_response")
2✔
337

338
        # evc_id is the service_id in the OXP response, it differs from the service_id in the connection.
339
        evc_id = oxp_response.get("service_id", None) if oxp_response else None
2✔
340

341
        if not oxp_response or not evc_id:
2✔
342
            return (
2✔
343
                "Connection does not have OXP response, cannot remove connection",
344
                404,
345
            )
346

UNCOV
347
        breakdown = self.db_instance.get_value_from_db(
×
348
            MongoCollections.BREAKDOWNS, service_id
349
        )
UNCOV
350
        if not breakdown:
×
351
            return "Did not find breakdown, cannot remove connection", 404
×
352

UNCOV
353
        try:
×
NEW
354
            breakdown["evc_id"] = evc_id
×
UNCOV
355
            status, code = self._send_breakdown_to_lc(
×
356
                breakdown, "delete", connection_request
357
            )
UNCOV
358
            self.db_instance.delete_one_entry(MongoCollections.BREAKDOWNS, service_id)
×
UNCOV
359
            self.archive_connection(service_id)
×
UNCOV
360
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
×
361
            # update topology in DB with updated states (bandwidth and available vlan pool)
UNCOV
362
            topology_db_update(self.db_instance, te_manager)
×
UNCOV
363
            return status, code
×
364
        except Exception as e:
×
365
            logger.debug(f"Error when removing breakdown: {e}")
×
366
            return f"Error when removing breakdown: {e}", 400
×
367

368
    def handle_link_removal(self, te_manager, removed_links):
2✔
369
        logger.debug("Handling connections that contain removed links.")
×
370
        failed_links = []
×
371
        for link in removed_links:
×
372
            failed_links.append({"id": link.id, "ports": link.ports})
×
373

374
        self.handle_link_failure(te_manager, failed_links)
×
375

376
    def handle_link_failure(self, te_manager, failed_links):
2✔
377
        logger.debug("Handling connections that contain failed links.")
×
378
        link_connections_dict = self.db_instance.get_value_from_db(
×
379
            MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
380
        )
381

382
        if not link_connections_dict:
×
383
            logger.debug("No connection has been placed yet.")
×
384
            return
×
385

386
        link_connections_dict = json.loads(link_connections_dict)
×
387

388
        for link in failed_links:
×
389
            logger.info(f"Handling link failure on {link['id']}")
×
390
            port_list = []
×
391
            if "ports" not in link:
×
392
                continue
×
393
            for port in link["ports"]:
×
394
                port_id = port if isinstance(port, str) else port.get("id")
×
395
                if not port_id:
×
396
                    continue
×
397
                port_list.append(port_id)
×
398

399
            simple_link = SimpleLink(port_list).to_string()
×
400

401
            if simple_link in link_connections_dict:
×
402
                logger.debug("Found failed link record!")
×
403
                service_ids = link_connections_dict[simple_link]
×
404
                for index, service_id in enumerate(service_ids):
×
405
                    logger.info(
×
406
                        f"Connection {service_id} affected by link {link['id']}"
407
                    )
408
                    connection = self.db_instance.get_value_from_db(
×
409
                        MongoCollections.CONNECTIONS, service_id
410
                    )
411
                    if not connection:
×
412
                        logger.debug(f"Did not find connection from db: {service_id}")
×
413
                        continue
×
414

415
                    try:
×
416
                        logger.debug(f"Link Failure: Removing connection: {connection}")
×
417
                        if connection.get("status") is None:
×
418
                            connection["status"] = str(
×
419
                                ConnectionStateMachine.State.ERROR
420
                            )
421
                        else:
422
                            connection, _ = connection_state_machine(
×
423
                                connection, ConnectionStateMachine.State.ERROR
424
                            )
425
                        logger.info(
×
426
                            f"Removing connection: {service_id} {connection.get('status')}"
427
                        )
428
                        self.remove_connection(te_manager, connection["id"])
×
429
                    except Exception as err:
×
430
                        logger.info(
×
431
                            f"Encountered error when deleting connection: {err}"
432
                        )
433
                        continue
×
434

435
                    logger.debug("Removed connection:")
×
436
                    logger.debug(connection)
×
437
                    connection, _ = connection_state_machine(
×
438
                        connection, ConnectionStateMachine.State.RECOVERING
439
                    )
440
                    connection["oxp_success_count"] = 0
×
441
                    self.db_instance.add_key_value_pair_to_db(
×
442
                        MongoCollections.CONNECTIONS, service_id, connection
443
                    )
444
                    _reason, code = self.place_connection(te_manager, connection)
×
445
                    if code // 100 != 2:
×
446
                        connection, _ = connection_state_machine(
×
447
                            connection, ConnectionStateMachine.State.ERROR
448
                        )
449
                        self.db_instance.add_key_value_pair_to_db(
×
450
                            MongoCollections.CONNECTIONS,
451
                            service_id,
452
                            connection,
453
                        )
454

455
                    logger.info(
×
456
                        f"place_connection result: ID: {service_id} reason='{_reason}', code={code}"
457
                    )
458

459
    def handle_uni_ports_up_to_down(self, uni_ports_up_to_down):
2✔
460
        """
461
        Handles the transition of UNI ports from 'up' to 'down' status.
462
        This function checks all the connections in the database and updates the status
463
        of connections whose endpoints are in the provided list `uni_ports_up_to_down`.
464
        The status of these connections will be changed to 'down'.
465
        Args:
466
            uni_ports_up_to_down (list): A list of UNI port identifiers whose status
467
                                         needs to be updated to 'down'.
468
        Returns:
469
            None
470
        """
471
        for port in uni_ports_up_to_down:
×
472
            port_in_db = self.db_instance.get_value_from_db(
×
473
                MongoCollections.PORTS, port.id
474
            )
475

476
            if port_in_db and Constants.PORT_CONNECTIONS_DICT in port_in_db:
×
477
                logger.debug("Found port record!")
×
478
                service_ids = port_in_db[Constants.PORT_CONNECTIONS_DICT]
×
479
                for service_id in service_ids:
×
480
                    connection = self.db_instance.get_value_from_db(
×
481
                        MongoCollections.CONNECTIONS, service_id
482
                    )
483
                    if not connection:
×
484
                        logger.debug(f"Cannot find connection {service_id} in DB.")
×
485
                        continue
×
486
                    logger.info(f"Updating connection {service_id} status to 'down'.")
×
487
                    connection["status"] = "DOWN"
×
488
                    self.db_instance.add_key_value_pair_to_db(
×
489
                        MongoCollections.CONNECTIONS, service_id, connection
490
                    )
491
                    logger.debug(f"Connection status updated for {service_id}")
×
492

493
    def handle_uni_ports_down_to_up(self, uni_ports_down_to_up):
2✔
494
        """
495
        Handles the transition of UNI ports from 'down' to 'up' status.
496
        This function checks all the connections in the database and updates the status
497
        of connections whose endpoints are in the provided list `uni_ports_down_to_up`.
498
        The status of these connections will be changed to 'up'.
499
        Args:
500
            uni_ports_down_to_up (list): A list of UNI port identifiers whose status
501
                                         needs to be updated to 'up'.
502
        Returns:
503
            None
504
        """
505
        for port in uni_ports_down_to_up:
×
506
            port_in_db = self.db_instance.get_value_from_db(
×
507
                MongoCollections.PORTS, port.id
508
            )
509

510
            if port_in_db and Constants.PORT_CONNECTIONS_DICT in port_in_db:
×
511
                logger.debug("Found port record!")
×
512
                service_ids = port_in_db[Constants.PORT_CONNECTIONS_DICT]
×
513
                for service_id in service_ids:
×
514
                    connection = self.db_instance.get_value_from_db(
×
515
                        MongoCollections.CONNECTIONS, service_id
516
                    )
517
                    if not connection:
×
518
                        logger.debug(f"Cannot find connection {service_id} in DB.")
×
519
                        continue
×
520
                    logger.info(f"Updating connection {service_id} status to 'up'.")
×
521
                    connection["status"] = "UP"
×
522
                    self.db_instance.add_key_value_pair_to_db(
×
523
                        MongoCollections.CONNECTIONS, service_id, connection
524
                    )
525
                    logger.debug(f"Connection status updated for {service_id}")
×
526

527
    def get_archived_connections(self, service_id: str):
2✔
528
        historical_connections = self.db_instance.get_value_from_db(
×
529
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
530
        )
531
        if not historical_connections:
×
532
            return None
×
533
        return historical_connections
×
534

535

536
def topology_db_update(db_instance, te_manager):
2✔
537
    # update OXP topology in DB:
538
    oxp_topology_map = te_manager.topology_manager.get_topology_map()
2✔
539
    for domain_name, topology in oxp_topology_map.items():
2✔
540
        msg_json = topology.to_dict()
2✔
541
        db_instance.add_key_value_pair_to_db(
2✔
542
            MongoCollections.TOPOLOGIES, domain_name, msg_json
543
        )
544
    # use 'latest_topo' as PK to save latest full topo to db
545
    latest_topo = te_manager.topology_manager.get_topology().to_dict()
2✔
546
    db_instance.add_key_value_pair_to_db(
2✔
547
        MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
548
    )
549
    logger.info("Save to database complete.")
2✔
550

551

552
def get_connection_status(db, service_id: str):
2✔
553
    """
554
    Form a response to `GET /l2vpn/1.0/{service_id}`.
555
    """
556
    assert db is not None
2✔
557
    assert service_id is not None
2✔
558

559
    breakdown = db.read_from_db(MongoCollections.BREAKDOWNS, service_id)
2✔
560
    if not breakdown:
2✔
561
        logger.info(f"Could not find breakdown for {service_id}")
2✔
562
        return {}
2✔
563

564
    logger.info(f"breakdown for {service_id}: {breakdown}")
2✔
565

566
    # The breakdown we read from DB is in this shape:
567
    #
568
    # {
569
    #     "_id": ObjectId("66ec71770c7022eb0922f41a"),
570
    #     "5b7df397-2269-489b-8e03-f256461265a0": {
571
    #         "urn:sdx:topology:amlight.net": {
572
    #             "name": "AMLIGHT_vlan_1000_10001",
573
    #             "dynamic_backup_path": True,
574
    #             "uni_a": {
575
    #                 "tag": {"value": 1000, "tag_type": 1},
576
    #                 "port_id": "urn:sdx:port:amlight.net:A1:1",
577
    #             },
578
    #             "uni_z": {
579
    #                 "tag": {"value": 10001, "tag_type": 1},
580
    #                 "port_id": "urn:sdx:port:amlight.net:B1:3",
581
    #             },
582
    #         }
583
    #     },
584
    # }
585
    #
586
    # We need to shape that into this form, at a minimum:
587
    #
588
    # {
589
    #     "c73da8e1-5d03-4620-a1db-7cdf23e8978c": {
590
    #         "service_id": "c73da8e1-5d03-4620-a1db-7cdf23e8978c",
591
    #         "name": "new-connection",
592
    #         "endpoints": [
593
    #          {
594
    #             "port_id": "urn:sdx:port:amlight.net:A1:1",
595
    #             "vlan": "150"
596
    #          },
597
    #          {
598
    #             "port_id": "urn:sdx:port:amlight:B1:1",
599
    #             "vlan": "300"}
600
    #         ],
601
    #     }
602
    # }
603
    #
604
    # See https://sdx-docs.readthedocs.io/en/latest/specs/provisioning-api-1.0.html#request-format-2
605
    #
606

607
    domains = breakdown.get(service_id)
2✔
608
    logger.info(f"domains for {service_id}: {domains.keys()}")
2✔
609

610
    # Find the name and description from the original connection
611
    # request for this service_id.
612
    name = "unknown"
2✔
613
    description = "unknown"
2✔
614
    status = "unknown"
2✔
615
    qos_metrics = {}
2✔
616
    scheduling = {}
2✔
617
    notifications = {}
2✔
618

619
    endpoints = list()
2✔
620
    request_endpoints = []
2✔
621
    response_endpoints = []
2✔
622
    request_uni_a_id = None
2✔
623
    request_uni_z_id = None
2✔
624

625
    request = db.read_from_db(MongoCollections.CONNECTIONS, service_id)
2✔
626
    if not request:
2✔
627
        logger.error(f"Can't find a connection request for {service_id}")
×
628
        # TODO: we're in a strange state here. Should we panic?
629
    else:
630
        logger.info(f"Found request for {service_id}: {request}")
2✔
631
        # We seem to have saved the original request in the form of a
632
        # string into the DB, not a record.
633
        request_dict = request.get(service_id)
2✔
634
        name = request_dict.get("name")
2✔
635
        description = request_dict.get("description")
2✔
636
        status = request_dict.get("status")
2✔
637
        qos_metrics = request_dict.get("qos_metrics")
2✔
638
        scheduling = request_dict.get("scheduling")
2✔
639
        notifications = request_dict.get("notifications")
2✔
640
        oxp_response = request_dict.get("oxp_response")
2✔
641
        status = parse_conn_status(status)
2✔
642
        if request_dict.get("endpoints") is not None:  # spec version 2.0.0
2✔
643
            request_endpoints = request_dict.get("endpoints")
2✔
644
            request_uni_a = request_endpoints[0]
2✔
645
            request_uni_a_id = request_uni_a.get("port_id")
2✔
646
            if request_uni_a_id is None:
2✔
647
                request_uni_a_id = request_uni_a.get("id")
×
648
            request_uni_z = request_endpoints[1]
2✔
649
            request_uni_z_id = request_uni_z.get("port_id")
2✔
650
            if request_uni_z_id is None:
2✔
651
                request_uni_z_id = request_uni_z.get("id")
×
652
        else:  # spec version 1.0.0
653
            request_uni_a = request_dict.get("ingress_port")
2✔
654
            request_uni_a_id = request_uni_a.get("id")
2✔
655
            request_uni_z = request_dict.get("egress_port")
2✔
656
            request_uni_z_id = request_uni_z.get("id")
2✔
657

658
    response = {}
2✔
659

660
    for domain, breakdown in domains.items():
2✔
661
        uni_a_port = breakdown.get("uni_a").get("port_id")
2✔
662
        uni_a_vlan = breakdown.get("uni_a").get("tag").get("value")
2✔
663

664
        endpoint_a = {
2✔
665
            "port_id": uni_a_port,
666
            "vlan": str(uni_a_vlan),
667
        }
668

669
        endpoints.append(endpoint_a)
2✔
670

671
        if request_uni_a_id == uni_a_port:
2✔
672
            (
2✔
673
                response_endpoints.append(endpoint_a)
674
                if endpoint_a not in response_endpoints
675
                else None
676
            )
677
        if request_uni_z_id == uni_a_port:
2✔
678
            (
×
679
                response_endpoints.append(endpoint_a)
680
                if endpoint_a not in response_endpoints
681
                else None
682
            )
683

684
        uni_z_port = breakdown.get("uni_z").get("port_id")
2✔
685
        uni_z_vlan = breakdown.get("uni_z").get("tag").get("value")
2✔
686

687
        endpoint_z = {
2✔
688
            "port_id": uni_z_port,
689
            "vlan": str(uni_z_vlan),
690
        }
691

692
        endpoints.append(endpoint_z)
2✔
693

694
        if request_uni_a_id == uni_z_port:
2✔
695
            (
×
696
                response_endpoints.append(endpoint_z)
697
                if endpoint_z not in response_endpoints
698
                else None
699
            )
700
        if request_uni_z_id == uni_z_port:
2✔
701
            (
2✔
702
                response_endpoints.append(endpoint_z)
703
                if endpoint_z not in response_endpoints
704
                else None
705
            )
706
        print(
2✔
707
            f"endpoints info: {request_uni_a_id}, {request_uni_z_id}, {uni_a_port}, {uni_z_port}"
708
        )
709

710
    # TODO: we're missing many of the attributes in the response here
711
    # which have been specified in the provisioning spec, such as:
712
    # name, description, qos_metrics, notifications, ownership,
713
    # creation_date, archived_date, status, state, counters_location,
714
    # last_modified, current_path, oxp_service_ids.  Implementing each
715
    # of them would be worth a separate ticket each, so we'll just
716
    # make do with this minimal response for now.
717
    response[service_id] = {
2✔
718
        "service_id": service_id,
719
        "name": name,
720
        "description": description,
721
        "status": status,
722
        "endpoints": response_endpoints,
723
        "current_path": endpoints,
724
        "archived_date": 0,
725
        "status": status,
726
    }
727
    if qos_metrics:
2✔
728
        response[service_id]["qos_metrics"] = qos_metrics
2✔
729

730
    if scheduling:
2✔
731
        response[service_id]["scheduling"] = scheduling
×
732

733
    if notifications:
2✔
734
        response[service_id]["notifications"] = notifications
2✔
735

736
    if oxp_response:
2✔
737
        response[service_id]["oxp_response"] = oxp_response
×
738

739
    logger.info(f"Formed a response: {response}")
2✔
740

741
    return response
2✔
742

743

744
def connection_state_machine(connection, new_state):
2✔
745
    conn_sm = ConnectionStateMachine()
2✔
746
    status = connection.get("status")
2✔
747
    value = conn_sm.State[status]
2✔
748
    conn_sm.set_state(value)
2✔
749
    conn_sm.transition(new_state)
2✔
750
    connection["status"] = str(conn_sm.get_state())
2✔
751
    return connection, conn_sm
2✔
752

753

754
def parse_conn_status(conn_state):
2✔
755
    """Parse connection from state to status as specified on the
756
    Provisioning Data Model Spec 1.0a. As per the spec:
757
    - up: if the L2VPN is operational
758
    - down: if the L2VPN is not operational due to topology issues/lack of path, or endpoints being down,
759
    - error: when there is an error with the L2VPN,
760
    - under provisioning: when the L2VPN is still being provisioned by the OXPs
761
    - maintenance: when the L2VPN is being affected by a network maintenance
762
    """
763
    state2status = {
2✔
764
        "UP": "up",
765
        "UNDER_PROVISIONING": "under provisioning",
766
        "RECOVERING": "down",
767
        "DOWN": "down",
768
        "ERROR": "down",
769
        "MODIFYING": "under provisioning",
770
    }
771
    return state2status.get(conn_state, "error")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc