• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 14672831121

25 Apr 2025 08:21PM UTC coverage: 56.078% (-0.07%) from 56.145%
14672831121

Pull #453

github

web-flow
Merge 6b07ad640 into 03aa84af5
Pull Request #453: Assign oxp_success_count to 0 only if it does not exist

31 of 53 new or added lines in 4 files covered. (58.49%)

1 existing line in 1 file now uncovered.

1181 of 2106 relevant lines covered (56.08%)

1.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.93
/sdx_controller/handlers/connection_handler.py
1
import json
2✔
2
import logging
2✔
3
import time
2✔
4
import traceback
2✔
5
from typing import Tuple
2✔
6

7
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
8
from sdx_datamodel.constants import Constants, MessageQueueNames, MongoCollections
2✔
9
from sdx_datamodel.parsing.exceptions import (
2✔
10
    AttributeNotSupportedException,
11
    ServiceNotSupportedException,
12
)
13
from sdx_pce.load_balancing.te_solver import TESolver
2✔
14
from sdx_pce.topology.temanager import TEManager
2✔
15
from sdx_pce.utils.exceptions import (
2✔
16
    RequestValidationError,
17
    SameSwitchRequestError,
18
    TEError,
19
)
20

21
from sdx_controller.messaging.topic_queue_producer import TopicQueueProducer
2✔
22
from sdx_controller.models.simple_link import SimpleLink
2✔
23
from sdx_controller.utils.parse_helper import ParseHelper
2✔
24

25
logger = logging.getLogger(__name__)
2✔
26
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
27

28

29
class ConnectionHandler:
2✔
30
    def __init__(self, db_instance):
2✔
31
        self.db_instance = db_instance
2✔
32
        self.parse_helper = ParseHelper()
2✔
33

34
    def _send_breakdown_to_lc(self, breakdown, operation, connection_request):
2✔
35
        logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}")
2✔
36

37
        if breakdown is None:
2✔
38
            return "Could not break down the solution", 400
×
39

40
        link_connections_dict_json = self.db_instance.read_from_db(
2✔
41
            MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
42
        )
43

44
        link_connections_dict = (
2✔
45
            json.loads(link_connections_dict_json[Constants.LINK_CONNECTIONS_DICT])
46
            if link_connections_dict_json
47
            else {}
48
        )
49

50
        interdomain_a, interdomain_b = None, None
2✔
51
        connection_service_id = connection_request.get("id")
2✔
52

53
        for domain, link in breakdown.items():
2✔
54
            port_list = []
2✔
55
            for key in link.keys():
2✔
56
                if "uni_" in key and "port_id" in link[key]:
2✔
57
                    port_list.append(link[key]["port_id"])
2✔
58

59
            if port_list:
2✔
60
                simple_link = SimpleLink(port_list).to_string()
2✔
61

62
                if simple_link not in link_connections_dict:
2✔
63
                    link_connections_dict[simple_link] = []
2✔
64

65
                if (
2✔
66
                    operation == "post"
67
                    and connection_service_id
68
                    and connection_service_id not in link_connections_dict[simple_link]
69
                ):
70
                    link_connections_dict[simple_link].append(connection_service_id)
2✔
71

72
                if (
2✔
73
                    operation == "delete"
74
                    and connection_service_id
75
                    and connection_service_id in link_connections_dict[simple_link]
76
                ):
77
                    link_connections_dict[simple_link].remove(connection_service_id)
2✔
78

79
            if interdomain_a:
2✔
80
                interdomain_b = link.get("uni_a", {}).get("port_id")
2✔
81
            else:
82
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
83

84
            if interdomain_a and interdomain_b:
2✔
85
                simple_link = SimpleLink([interdomain_a, interdomain_b]).to_string()
2✔
86

87
                if simple_link not in link_connections_dict:
2✔
88
                    link_connections_dict[simple_link] = []
2✔
89

90
                if (
2✔
91
                    operation == "post"
92
                    and connection_service_id
93
                    and connection_service_id not in link_connections_dict[simple_link]
94
                ):
95
                    link_connections_dict[simple_link].append(connection_service_id)
2✔
96

97
                if (
2✔
98
                    operation == "delete"
99
                    and connection_service_id
100
                    and connection_service_id in link_connections_dict[simple_link]
101
                ):
102
                    link_connections_dict[simple_link].remove(connection_service_id)
2✔
103

104
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
105

106
            self.db_instance.add_key_value_pair_to_db(
2✔
107
                MongoCollections.LINKS,
108
                Constants.LINK_CONNECTIONS_DICT,
109
                json.dumps(link_connections_dict),
110
            )
111

112
            logger.debug(f"Attempting to publish domain: {domain}, link: {link}")
2✔
113

114
            # From "urn:ogf:network:sdx:topology:amlight.net", attempt to
115
            # extract a string like "amlight".
116
            domain_name = self.parse_helper.find_domain_name(domain, ":") or f"{domain}"
2✔
117
            exchange_name = MessageQueueNames.CONNECTIONS
2✔
118

119
            logger.debug(
2✔
120
                f"Doing '{operation}' operation for '{link}' with exchange_name: {exchange_name}, "
121
                f"routing_key: {domain_name}"
122
            )
123
            mq_link = {
2✔
124
                "operation": operation,
125
                "service_id": connection_service_id,
126
                "link": link,
127
            }
128
            producer = TopicQueueProducer(
2✔
129
                timeout=5, exchange_name=exchange_name, routing_key=domain_name
130
            )
131
            producer.call(json.dumps(mq_link))
2✔
132
            producer.stop_keep_alive()
2✔
133

134
        # We will get to this point only if all the previous steps
135
        # leading up to this point were successful.
136
        return "Connection published", 201
2✔
137

138
    def place_connection(
2✔
139
        self, te_manager: TEManager, connection_request: dict
140
    ) -> Tuple[str, int]:
141
        """
142
        Do the actual work of creating a connection.
143

144
        This method will call pce library to generate a breakdown
145
        across relevant domains, and then send individual connection
146
        requests to each of those domains.
147

148
        Note that we can return early if things fail.  Return value is
149
        a tuple of the form (reason, HTTP code).
150
        """
151
        # for num, val in enumerate(te_manager.get_topology_map().values()):
152
        #     logger.debug(f"TE topology #{num}: {val}")
153

154
        graph = te_manager.generate_graph_te()
2✔
155
        if graph is None:
2✔
156
            return "No SDX topology found", 424
×
157
        try:
2✔
158
            traffic_matrix = te_manager.generate_traffic_matrix(
2✔
159
                connection_request=connection_request
160
            )
161
        except RequestValidationError as request_err:
2✔
162
            err = traceback.format_exc().replace("\n", ", ")
2✔
163
            logger.error(
2✔
164
                f"Error when parsing and validating request: {request_err} - {err}"
165
            )
166
            return f"Error: {request_err}", request_err.request_code
2✔
167
        except ServiceNotSupportedException as service_err:
2✔
168
            err = traceback.format_exc().replace("\n", ", ")
×
169
            logger.error(
×
170
                f"Error when parsing and validating request: {service_err} - {err}"
171
            )
172
            return f"Error: {service_err}", 402
×
173
        except AttributeNotSupportedException as attr_err:
2✔
174
            err = traceback.format_exc().replace("\n", ", ")
×
175
            logger.error(
×
176
                f"Error when parsing and validating request: {attr_err} - {err}"
177
            )
178
            return f"Error: {attr_err}", 422
×
179
        except SameSwitchRequestError as ctx:
2✔
180
            logger.debug(
2✔
181
                f"{str(ctx)},{ctx.request_id},{ctx.domain_id},{ctx.ingress_port},{ctx.egress_port}, {ctx.ingress_user_port_tag}, {ctx.egress_user_port_tag}"
182
            )
183
            try:
2✔
184
                breakdown = te_manager.generate_connection_breakdown_same_switch(
2✔
185
                    ctx.request_id,
186
                    ctx.domain_id,
187
                    ctx.ingress_port,
188
                    ctx.egress_port,
189
                    ctx.ingress_user_port_tag,
190
                    ctx.egress_user_port_tag,
191
                )
192
                self.db_instance.add_key_value_pair_to_db(
2✔
193
                    MongoCollections.BREAKDOWNS, connection_request["id"], breakdown
194
                )
195
                status, code = self._send_breakdown_to_lc(
2✔
196
                    breakdown, "post", connection_request
197
                )
198
                logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
199
                # update topology in DB with updated states (bandwidth and available vlan pool)
200
                topology_db_update(self.db_instance, te_manager)
2✔
201
                return status, code
2✔
202
            except TEError as te_err:
×
203
                # We could probably return te_err.te_code instead of 400,
204
                # but I don't think PCE should use HTTP error codes,
205
                # because that violates abstraction boundaries.
206
                return f"PCE error: {te_err}", te_err.te_code
×
207
            except Exception as e:
×
208
                err = traceback.format_exc().replace("\n", ", ")
×
209
                logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
×
210
                return f"Error: {e}", 410
×
211

212
        # General case: traffic_matrix is not None
213
        if traffic_matrix is None:
2✔
214
            return (
2✔
215
                "Request does not have a valid JSON or body is incomplete/incorrect",
216
                400,
217
            )
218

219
        logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'")
2✔
220
        try:
2✔
221
            conn = te_manager.requests_connectivity(traffic_matrix)
2✔
222
            if conn is False:
2✔
223
                logger.error(f"Graph connectivity: {conn}")
×
224
                raise TEError("No path is available, the graph is not connected", 412)
225
        except TEError as te_err:
×
226
            return f"PCE error: {te_err}", te_err.te_code
×
227

228
        solver = TESolver(graph, traffic_matrix)
2✔
229
        solution = solver.solve()
2✔
230
        logger.debug(f"TESolver result: {solution}")
2✔
231

232
        if solution is None or solution.connection_map is None:
2✔
233
            return "Could not solve the request", 410
×
234

235
        try:
2✔
236
            breakdown = te_manager.generate_connection_breakdown(
2✔
237
                solution, connection_request
238
            )
239
            self.db_instance.add_key_value_pair_to_db(
2✔
240
                MongoCollections.BREAKDOWNS, connection_request["id"], breakdown
241
            )
242
            status, code = self._send_breakdown_to_lc(
2✔
243
                breakdown, "post", connection_request
244
            )
245
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
246
            # update topology in DB with updated states (bandwidth and available vlan pool)
247
            topology_db_update(self.db_instance, te_manager)
2✔
248
            return status, code
2✔
249
        except TEError as te_err:
2✔
250
            # We could probably return te_err.te_code instead of 400,
251
            # but I don't think PCE should use HTTP error codes,
252
            # because that violates abstraction boundaries.
253
            return f"PCE error: {te_err}", te_err.te_code
×
254
        except Exception as e:
2✔
255
            err = traceback.format_exc().replace("\n", ", ")
2✔
256
            logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
2✔
257
            return f"Error: {e}", 410
2✔
258

259
    def archive_connection(self, service_id) -> None:
2✔
260
        connection_request = self.db_instance.read_from_db(
2✔
261
            MongoCollections.CONNECTIONS, service_id
262
        )
263
        if not connection_request:
2✔
264
            return
×
265

266
        connection_request = connection_request[service_id]
2✔
267
        self.db_instance.delete_one_entry(MongoCollections.CONNECTIONS, service_id)
2✔
268

269
        historical_connections = self.db_instance.read_from_db(
2✔
270
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
271
        )
272
        # Current timestamp in seconds
273
        timestamp = int(time.time())
2✔
274

275
        if historical_connections:
2✔
276
            historical_connections_list = historical_connections[service_id]
×
277
            historical_connections_list.append(
×
278
                json.dumps({timestamp: connection_request})
279
            )
280
            self.db_instance.add_key_value_pair_to_db(
×
281
                MongoCollections.HISTORICAL_CONNECTIONS,
282
                service_id,
283
                historical_connections_list,
284
            )
285
        else:
286
            self.db_instance.add_key_value_pair_to_db(
2✔
287
                MongoCollections.HISTORICAL_CONNECTIONS,
288
                service_id,
289
                [json.dumps({timestamp: connection_request})],
290
            )
291
        logger.debug(f"Archived connection: {service_id}")
2✔
292

293
    def remove_connection(self, te_manager, service_id) -> Tuple[str, int]:
2✔
294
        te_manager.delete_connection(service_id)
2✔
295
        connection_request = self.db_instance.read_from_db(
2✔
296
            MongoCollections.CONNECTIONS, service_id
297
        )
298
        if not connection_request:
2✔
299
            return "Did not find connection request, cannot remove connection", 404
×
300

301
        connection_request = connection_request[service_id]
2✔
302

303
        breakdown = self.db_instance.read_from_db(
2✔
304
            MongoCollections.BREAKDOWNS, service_id
305
        )
306
        if not breakdown:
2✔
307
            return "Did not find breakdown, cannot remove connection", 404
×
308
        breakdown = breakdown[service_id]
2✔
309

310
        try:
2✔
311
            status, code = self._send_breakdown_to_lc(
2✔
312
                breakdown, "delete", connection_request
313
            )
314
            self.db_instance.delete_one_entry(MongoCollections.BREAKDOWNS, service_id)
2✔
315
            self.archive_connection(service_id)
2✔
316
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
317
            # update topology in DB with updated states (bandwidth and available vlan pool)
318
            topology_db_update(self.db_instance, te_manager)
2✔
319
            return status, code
2✔
320
        except Exception as e:
×
321
            logger.debug(f"Error when removing breakdown: {e}")
×
322
            return f"Error when removing breakdown: {e}", 400
×
323

324
    def handle_link_removal(self, te_manager, removed_links):
2✔
325
        logger.debug("Handling connections that contain removed links.")
×
326
        failed_links = []
×
327
        for link in removed_links:
×
328
            failed_links.append({"id": link.id, "ports": link.ports})
×
329

330
        self.handle_link_failure(te_manager, failed_links)
×
331

332
    def handle_link_failure(self, te_manager, failed_links):
2✔
333
        logger.debug("Handling connections that contain failed links.")
×
334
        link_connections_dict_str = self.db_instance.read_from_db(
×
335
            MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
336
        )
337

338
        if (
×
339
            not link_connections_dict_str
340
            or not link_connections_dict_str[Constants.LINK_CONNECTIONS_DICT]
341
        ):
342
            logger.debug("No connection has been placed yet.")
×
343
            return
×
344

345
        link_connections_dict = json.loads(
×
346
            link_connections_dict_str[Constants.LINK_CONNECTIONS_DICT]
347
        )
348

349
        for link in failed_links:
×
350
            logger.info(f"Handling link failure on {link['id']}")
×
351
            port_list = []
×
352
            if "ports" not in link:
×
353
                continue
×
354
            for port in link["ports"]:
×
355
                port_id = port if isinstance(port, str) else port.get("id")
×
356
                if not port_id:
×
357
                    continue
×
358
                port_list.append(port_id)
×
359

360
            simple_link = SimpleLink(port_list).to_string()
×
361

362
            if simple_link in link_connections_dict:
×
363
                logger.debug("Found failed link record!")
×
364
                service_ids = link_connections_dict[simple_link]
×
365
                for index, service_id in enumerate(service_ids):
×
366
                    logger.info(
×
367
                        f"Connection {service_id} affected by link {link['id']}"
368
                    )
NEW
369
                    connection = self.db_instance.read_from_db(
×
370
                        MongoCollections.CONNECTIONS, service_id
371
                    )
NEW
372
                    if not connection:
×
373
                        logger.debug(f"Did not find connection from db: {service_id}")
×
374
                        continue
×
NEW
375
                    connection = connection[service_id]
×
376
                    try:
×
377
                        logger.debug(f"Link Failure: Removing connection: {connection}")
×
378
                        if connection.get("status") is None:
×
379
                            connection["status"] = str(
×
380
                                ConnectionStateMachine.State.ERROR
381
                            )
382
                        else:
383
                            connection, _ = connection_state_machine(
×
384
                                connection, ConnectionStateMachine.State.ERROR
385
                            )
386
                        logger.info(
×
387
                            f"Removing connection: {service_id} {connection.get('status')}"
388
                        )
389
                        self.remove_connection(te_manager, connection["id"])
×
390
                    except Exception as err:
×
391
                        logger.info(
×
392
                            f"Encountered error when deleting connection: {err}"
393
                        )
394
                        continue
×
395

396
                    logger.debug("Removed connection:")
×
397
                    logger.debug(connection)
×
398
                    connection, _ = connection_state_machine(
×
399
                        connection, ConnectionStateMachine.State.RECOVERING
400
                    )
401
                    connection["oxp_success_count"] = 0
×
402
                    self.db_instance.add_key_value_pair_to_db(
×
403
                        MongoCollections.CONNECTIONS, service_id, connection
404
                    )
405
                    _reason, code = self.place_connection(te_manager, connection)
×
406
                    if code // 100 != 2:
×
407
                        connection, _ = connection_state_machine(
×
408
                            connection, ConnectionStateMachine.State.ERROR
409
                        )
410
                        self.db_instance.add_key_value_pair_to_db(
×
411
                            MongoCollections.CONNECTIONS,
412
                            service_id,
413
                            connection,
414
                        )
415

416
                    logger.info(
×
417
                        f"place_connection result: ID: {service_id} reason='{_reason}', code={code}"
418
                    )
419

420
    def get_archived_connections(self, service_id: str):
2✔
421
        historical_connections = self.db_instance.read_from_db(
×
422
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
423
        )
424
        if not historical_connections:
×
425
            return None
×
426
        return historical_connections[service_id]
×
427

428

429
def topology_db_update(db_instance, te_manager):
2✔
430
    # update OXP topology in DB:
431
    oxp_topology_map = te_manager.topology_manager.get_topology_map()
2✔
432
    for domain_name, topology in oxp_topology_map.items():
2✔
433
        msg_json = topology.to_dict()
2✔
434
        db_instance.add_key_value_pair_to_db(
2✔
435
            MongoCollections.TOPOLOGIES, domain_name, json.dumps(msg_json)
436
        )
437
    # use 'latest_topo' as PK to save latest full topo to db
438
    latest_topo = json.dumps(te_manager.topology_manager.get_topology().to_dict())
2✔
439
    db_instance.add_key_value_pair_to_db(
2✔
440
        MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
441
    )
442
    logger.info("Save to database complete.")
2✔
443

444

445
def get_connection_status(db, service_id: str):
2✔
446
    """
447
    Form a response to `GET /l2vpn/1.0/{service_id}`.
448
    """
449
    assert db is not None
2✔
450
    assert service_id is not None
2✔
451

452
    breakdown = db.read_from_db(MongoCollections.BREAKDOWNS, service_id)
2✔
453
    if not breakdown:
2✔
454
        logger.info(f"Could not find breakdown for {service_id}")
2✔
455
        return {}
2✔
456

457
    logger.info(f"breakdown for {service_id}: {breakdown}")
2✔
458

459
    # The breakdown we read from DB is in this shape:
460
    #
461
    # {
462
    #     "_id": ObjectId("66ec71770c7022eb0922f41a"),
463
    #     "5b7df397-2269-489b-8e03-f256461265a0": {
464
    #         "urn:sdx:topology:amlight.net": {
465
    #             "name": "AMLIGHT_vlan_1000_10001",
466
    #             "dynamic_backup_path": True,
467
    #             "uni_a": {
468
    #                 "tag": {"value": 1000, "tag_type": 1},
469
    #                 "port_id": "urn:sdx:port:amlight.net:A1:1",
470
    #             },
471
    #             "uni_z": {
472
    #                 "tag": {"value": 10001, "tag_type": 1},
473
    #                 "port_id": "urn:sdx:port:amlight.net:B1:3",
474
    #             },
475
    #         }
476
    #     },
477
    # }
478
    #
479
    # We need to shape that into this form, at a minimum:
480
    #
481
    # {
482
    #     "c73da8e1-5d03-4620-a1db-7cdf23e8978c": {
483
    #         "service_id": "c73da8e1-5d03-4620-a1db-7cdf23e8978c",
484
    #         "name": "new-connection",
485
    #         "endpoints": [
486
    #          {
487
    #             "port_id": "urn:sdx:port:amlight.net:A1:1",
488
    #             "vlan": "150"
489
    #          },
490
    #          {
491
    #             "port_id": "urn:sdx:port:amlight:B1:1",
492
    #             "vlan": "300"}
493
    #         ],
494
    #     }
495
    # }
496
    #
497
    # See https://sdx-docs.readthedocs.io/en/latest/specs/provisioning-api-1.0.html#request-format-2
498
    #
499

500
    domains = breakdown.get(service_id)
2✔
501
    logger.info(f"domains for {service_id}: {domains.keys()}")
2✔
502

503
    # Find the name and description from the original connection
504
    # request for this service_id.
505
    name = "unknown"
2✔
506
    description = "unknown"
2✔
507
    status = "unknown"
2✔
508
    qos_metrics = {}
2✔
509
    scheduling = {}
2✔
510
    notifications = {}
2✔
511

512
    endpoints = list()
2✔
513
    request_endpoints = []
2✔
514
    response_endpoints = []
2✔
515
    request_uni_a_id = None
2✔
516
    request_uni_z_id = None
2✔
517

518
    request = db.read_from_db(MongoCollections.CONNECTIONS, service_id)
2✔
519
    if not request:
2✔
520
        logger.error(f"Can't find a connection request for {service_id}")
×
521
        # TODO: we're in a strange state here. Should we panic?
522
    else:
523
        logger.info(f"Found request for {service_id}: {request}")
2✔
524
        # We seem to have saved the original request in the form of a
525
        # string into the DB, not a record.
526
        request_dict = request.get(service_id)
2✔
527
        name = request_dict.get("name")
2✔
528
        description = request_dict.get("description")
2✔
529
        status = request_dict.get("status")
2✔
530
        qos_metrics = request_dict.get("qos_metrics")
2✔
531
        scheduling = request_dict.get("scheduling")
2✔
532
        notifications = request_dict.get("notifications")
2✔
533
        oxp_response = request_dict.get("oxp_response")
2✔
534
        status = parse_conn_status(request_dict.get("status"))
2✔
535
        if request_dict.get("endpoints") is not None:  # spec version 2.0.0
2✔
536
            request_endpoints = request_dict.get("endpoints")
2✔
537
            request_uni_a = request_endpoints[0]
2✔
538
            request_uni_a_id = request_uni_a.get("port_id")
2✔
539
            if request_uni_a_id is None:
2✔
540
                request_uni_a_id = request_uni_a.get("id")
×
541
            request_uni_z = request_endpoints[1]
2✔
542
            request_uni_z_id = request_uni_z.get("port_id")
2✔
543
            if request_uni_z_id is None:
2✔
544
                request_uni_z_id = request_uni_z.get("id")
×
545
        else:  # spec version 1.0.0
546
            request_uni_a = request_dict.get("ingress_port")
2✔
547
            request_uni_a_id = request_uni_a.get("id")
2✔
548
            request_uni_z = request_dict.get("egress_port")
2✔
549
            request_uni_z_id = request_uni_z.get("id")
2✔
550

551
    response = {}
2✔
552

553
    for domain, breakdown in domains.items():
2✔
554
        uni_a_port = breakdown.get("uni_a").get("port_id")
2✔
555
        uni_a_vlan = breakdown.get("uni_a").get("tag").get("value")
2✔
556

557
        endpoint_a = {
2✔
558
            "port_id": uni_a_port,
559
            "vlan": str(uni_a_vlan),
560
        }
561

562
        endpoints.append(endpoint_a)
2✔
563

564
        if request_uni_a_id == uni_a_port:
2✔
565
            (
2✔
566
                response_endpoints.append(endpoint_a)
567
                if endpoint_a not in response_endpoints
568
                else None
569
            )
570
        if request_uni_z_id == uni_a_port:
2✔
571
            (
×
572
                response_endpoints.append(endpoint_a)
573
                if endpoint_a not in response_endpoints
574
                else None
575
            )
576

577
        uni_z_port = breakdown.get("uni_z").get("port_id")
2✔
578
        uni_z_vlan = breakdown.get("uni_z").get("tag").get("value")
2✔
579

580
        endpoint_z = {
2✔
581
            "port_id": uni_z_port,
582
            "vlan": str(uni_z_vlan),
583
        }
584

585
        endpoints.append(endpoint_z)
2✔
586

587
        if request_uni_a_id == uni_z_port:
2✔
588
            (
×
589
                response_endpoints.append(endpoint_z)
590
                if endpoint_z not in response_endpoints
591
                else None
592
            )
593
        if request_uni_z_id == uni_z_port:
2✔
594
            (
2✔
595
                response_endpoints.append(endpoint_z)
596
                if endpoint_z not in response_endpoints
597
                else None
598
            )
599
        print(
2✔
600
            f"endpoints info: {request_uni_a_id}, {request_uni_z_id}, {uni_a_port}, {uni_z_port}"
601
        )
602

603
    # TODO: we're missing many of the attributes in the response here
604
    # which have been specified in the provisioning spec, such as:
605
    # name, description, qos_metrics, notifications, ownership,
606
    # creation_date, archived_date, status, state, counters_location,
607
    # last_modified, current_path, oxp_service_ids.  Implementing each
608
    # of them would be worth a separate ticket each, so we'll just
609
    # make do with this minimal response for now.
610
    response[service_id] = {
2✔
611
        "service_id": service_id,
612
        "name": name,
613
        "description": description,
614
        "status": status,
615
        "endpoints": response_endpoints,
616
        "current_path": endpoints,
617
        "archived_date": 0,
618
        "status": status,
619
    }
620
    if qos_metrics:
2✔
621
        response[service_id]["qos_metrics"] = qos_metrics
2✔
622

623
    if scheduling:
2✔
624
        response[service_id]["scheduling"] = scheduling
×
625

626
    if notifications:
2✔
627
        response[service_id]["notifications"] = notifications
2✔
628

629
    if oxp_response:
2✔
630
        response[service_id]["oxp_response"] = oxp_response
×
631

632
    logger.info(f"Formed a response: {response}")
2✔
633

634
    return response
2✔
635

636

637
def connection_state_machine(connection, new_state):
2✔
638
    conn_sm = ConnectionStateMachine()
2✔
639
    status = connection.get("status")
2✔
640
    value = conn_sm.State[status]
2✔
641
    conn_sm.set_state(value)
2✔
642
    conn_sm.transition(new_state)
2✔
643
    connection["status"] = str(conn_sm.get_state())
2✔
644
    return connection, conn_sm
2✔
645

646

647
def parse_conn_status(conn_state):
2✔
648
    """Parse connection from state to status as specified on the
649
    Provisioning Data Model Spec 1.0a. As per the spec:
650
    - up: if the L2VPN is operational
651
    - down: if the L2VPN is not operational due to topology issues/lack of path, or endpoints being down,
652
    - error: when there is an error with the L2VPN,
653
    - under provisioning: when the L2VPN is still being provisioned by the OXPs
654
    - maintenance: when the L2VPN is being affected by a network maintenance
655
    """
656
    state2status = {
2✔
657
        "UP": "up",
658
        "UNDER_PROVISIONING": "under provisioning",
659
        "RECOVERING": "down",
660
        "DOWN": "down",
661
        "ERROR": "down",
662
        "MODIFYING": "under provisioning",
663
    }
664
    return state2status.get(conn_state, "error")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc