• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 15905001540

26 Jun 2025 02:43PM UTC coverage: 56.324% (-0.3%) from 56.629%
15905001540

push

github

web-flow
Merge pull request #471 from atlanticwave-sdx/fix/470-avoid-load-latest-topo

Avoid loading LATEST_TOPOLOGY into TEManager

1 of 1 new or added line in 1 file covered. (100.0%)

7 existing lines in 2 files now uncovered.

1189 of 2111 relevant lines covered (56.32%)

1.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.58
/sdx_controller/handlers/connection_handler.py
1
import json
2✔
2
import logging
2✔
3
import time
2✔
4
import traceback
2✔
5
from typing import Tuple
2✔
6

7
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
8
from sdx_datamodel.constants import Constants, MessageQueueNames, MongoCollections
2✔
9
from sdx_datamodel.parsing.exceptions import (
2✔
10
    AttributeNotSupportedException,
11
    ServiceNotSupportedException,
12
)
13
from sdx_pce.load_balancing.te_solver import TESolver
2✔
14
from sdx_pce.topology.temanager import TEManager
2✔
15
from sdx_pce.utils.exceptions import (
2✔
16
    RequestValidationError,
17
    SameSwitchRequestError,
18
    TEError,
19
)
20

21
from sdx_controller.messaging.topic_queue_producer import TopicQueueProducer
2✔
22
from sdx_controller.models.simple_link import SimpleLink
2✔
23
from sdx_controller.utils.parse_helper import ParseHelper
2✔
24

25
logger = logging.getLogger(__name__)
2✔
26
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
27

28

29
class ConnectionHandler:
2✔
30
    def __init__(self, db_instance):
2✔
31
        self.db_instance = db_instance
2✔
32
        self.parse_helper = ParseHelper()
2✔
33

34
    def _send_breakdown_to_lc(self, breakdown, operation, connection_request):
2✔
35
        logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}")
2✔
36

37
        if breakdown is None:
2✔
38
            return "Could not break down the solution", 400
×
39

40
        link_connections_dict_json = self.db_instance.get_value_from_db(
2✔
41
            MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
42
        )
43

44
        link_connections_dict = (
2✔
45
            json.loads(link_connections_dict_json) if link_connections_dict_json else {}
46
        )
47

48
        interdomain_a, interdomain_b = None, None
2✔
49
        connection_service_id = connection_request.get("id")
2✔
50

51
        for domain, link in breakdown.items():
2✔
52
            port_list = []
2✔
53
            for key in link.keys():
2✔
54
                if "uni_" in key and "port_id" in link[key]:
2✔
55
                    port_list.append(link[key]["port_id"])
2✔
56

57
            if port_list:
2✔
58
                simple_link = SimpleLink(port_list).to_string()
2✔
59

60
                if simple_link not in link_connections_dict:
2✔
61
                    link_connections_dict[simple_link] = []
2✔
62

63
                if (
2✔
64
                    operation == "post"
65
                    and connection_service_id
66
                    and connection_service_id not in link_connections_dict[simple_link]
67
                ):
68
                    link_connections_dict[simple_link].append(connection_service_id)
2✔
69

70
                if (
2✔
71
                    operation == "delete"
72
                    and connection_service_id
73
                    and connection_service_id in link_connections_dict[simple_link]
74
                ):
75
                    link_connections_dict[simple_link].remove(connection_service_id)
2✔
76

77
            if interdomain_a:
2✔
78
                interdomain_b = link.get("uni_a", {}).get("port_id")
2✔
79
            else:
80
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
81

82
            if interdomain_a and interdomain_b:
2✔
83
                simple_link = SimpleLink([interdomain_a, interdomain_b]).to_string()
2✔
84

85
                if simple_link not in link_connections_dict:
2✔
86
                    link_connections_dict[simple_link] = []
2✔
87

88
                if (
2✔
89
                    operation == "post"
90
                    and connection_service_id
91
                    and connection_service_id not in link_connections_dict[simple_link]
92
                ):
93
                    link_connections_dict[simple_link].append(connection_service_id)
2✔
94

95
                if (
2✔
96
                    operation == "delete"
97
                    and connection_service_id
98
                    and connection_service_id in link_connections_dict[simple_link]
99
                ):
100
                    link_connections_dict[simple_link].remove(connection_service_id)
2✔
101

102
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
103

104
            self.db_instance.add_key_value_pair_to_db(
2✔
105
                MongoCollections.LINKS,
106
                Constants.LINK_CONNECTIONS_DICT,
107
                json.dumps(link_connections_dict),
108
            )
109

110
            logger.debug(f"Attempting to publish domain: {domain}, link: {link}")
2✔
111

112
            # From "urn:ogf:network:sdx:topology:amlight.net", attempt to
113
            # extract a string like "amlight".
114
            domain_name = self.parse_helper.find_domain_name(domain, ":") or f"{domain}"
2✔
115
            exchange_name = MessageQueueNames.CONNECTIONS
2✔
116

117
            logger.debug(
2✔
118
                f"Doing '{operation}' operation for '{link}' with exchange_name: {exchange_name}, "
119
                f"routing_key: {domain_name}"
120
            )
121
            mq_link = {
2✔
122
                "operation": operation,
123
                "service_id": connection_service_id,
124
                "link": link,
125
            }
126
            producer = TopicQueueProducer(
2✔
127
                timeout=5, exchange_name=exchange_name, routing_key=domain_name
128
            )
129
            producer.call(json.dumps(mq_link))
2✔
130
            producer.stop_keep_alive()
2✔
131

132
        # We will get to this point only if all the previous steps
133
        # leading up to this point were successful.
134
        return "Connection published", 201
2✔
135

136
    def place_connection(
2✔
137
        self, te_manager: TEManager, connection_request: dict
138
    ) -> Tuple[str, int]:
139
        """
140
        Do the actual work of creating a connection.
141

142
        This method will call pce library to generate a breakdown
143
        across relevant domains, and then send individual connection
144
        requests to each of those domains.
145

146
        Note that we can return early if things fail.  Return value is
147
        a tuple of the form (reason, HTTP code).
148
        """
149
        # for num, val in enumerate(te_manager.get_topology_map().values()):
150
        #     logger.debug(f"TE topology #{num}: {val}")
151

152
        graph = te_manager.generate_graph_te()
2✔
153
        if graph is None:
2✔
154
            return "No SDX topology found", 424
×
155
        try:
2✔
156
            traffic_matrix = te_manager.generate_traffic_matrix(
2✔
157
                connection_request=connection_request
158
            )
159
        except RequestValidationError as request_err:
2✔
160
            err = traceback.format_exc().replace("\n", ", ")
2✔
161
            logger.error(
2✔
162
                f"Error when parsing and validating request: {request_err} - {err}"
163
            )
164
            return f"Error: {request_err}", request_err.request_code
2✔
165
        except ServiceNotSupportedException as service_err:
2✔
166
            err = traceback.format_exc().replace("\n", ", ")
×
167
            logger.error(
×
168
                f"Error when parsing and validating request: {service_err} - {err}"
169
            )
170
            return f"Error: {service_err}", 402
×
171
        except AttributeNotSupportedException as attr_err:
2✔
172
            err = traceback.format_exc().replace("\n", ", ")
×
173
            logger.error(
×
174
                f"Error when parsing and validating request: {attr_err} - {err}"
175
            )
176
            return f"Error: {attr_err}", 422
×
177
        except SameSwitchRequestError as ctx:
2✔
178
            logger.debug(
2✔
179
                f"{str(ctx)},{ctx.request_id},{ctx.domain_id},{ctx.ingress_port},{ctx.egress_port}, {ctx.ingress_user_port_tag}, {ctx.egress_user_port_tag}"
180
            )
181
            try:
2✔
182
                breakdown = te_manager.generate_connection_breakdown_same_switch(
2✔
183
                    ctx.request_id,
184
                    ctx.domain_id,
185
                    ctx.ingress_port,
186
                    ctx.egress_port,
187
                    ctx.ingress_user_port_tag,
188
                    ctx.egress_user_port_tag,
189
                )
190
                self.db_instance.add_key_value_pair_to_db(
2✔
191
                    MongoCollections.BREAKDOWNS, connection_request["id"], breakdown
192
                )
193
                status, code = self._send_breakdown_to_lc(
2✔
194
                    breakdown, "post", connection_request
195
                )
196
                logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
197
                # update topology in DB with updated states (bandwidth and available vlan pool)
198
                topology_db_update(self.db_instance, te_manager)
2✔
199
                return status, code
2✔
200
            except TEError as te_err:
×
201
                # We could probably return te_err.te_code instead of 400,
202
                # but I don't think PCE should use HTTP error codes,
203
                # because that violates abstraction boundaries.
204
                return f"PCE error: {te_err}", te_err.te_code
×
205
            except Exception as e:
×
206
                err = traceback.format_exc().replace("\n", ", ")
×
207
                logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
×
208
                return f"Error: {e}", 410
×
209

210
        # General case: traffic_matrix is not None
211
        if traffic_matrix is None:
2✔
212
            return (
2✔
213
                "Request does not have a valid JSON or body is incomplete/incorrect",
214
                400,
215
            )
216

217
        logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'")
2✔
218
        try:
2✔
219
            conn = te_manager.requests_connectivity(traffic_matrix)
2✔
220
            if conn is False:
2✔
221
                logger.error(f"Graph connectivity: {conn}")
×
222
                raise TEError("No path is available, the graph is not connected", 412)
223
        except TEError as te_err:
×
224
            return f"PCE error: {te_err}", te_err.te_code
×
225

226
        solver = TESolver(graph, traffic_matrix)
2✔
227
        solution = solver.solve()
2✔
228
        logger.debug(f"TESolver result: {solution}")
2✔
229

230
        if solution is None or solution.connection_map is None:
2✔
231
            return "Could not solve the request", 410
×
232

233
        try:
2✔
234
            breakdown = te_manager.generate_connection_breakdown(
2✔
235
                solution, connection_request
236
            )
237
            self.db_instance.add_key_value_pair_to_db(
2✔
238
                MongoCollections.BREAKDOWNS, connection_request["id"], breakdown
239
            )
240
            status, code = self._send_breakdown_to_lc(
2✔
241
                breakdown, "post", connection_request
242
            )
243
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
244
            # update topology in DB with updated states (bandwidth and available vlan pool)
245
            topology_db_update(self.db_instance, te_manager)
2✔
246
            return status, code
2✔
UNCOV
247
        except TEError as te_err:
×
248
            # We could probably return te_err.te_code instead of 400,
249
            # but I don't think PCE should use HTTP error codes,
250
            # because that violates abstraction boundaries.
251
            return f"PCE error: {te_err}", te_err.te_code
×
UNCOV
252
        except Exception as e:
×
UNCOV
253
            err = traceback.format_exc().replace("\n", ", ")
×
UNCOV
254
            logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
×
UNCOV
255
            return f"Error: {e}", 410
×
256

257
    def archive_connection(self, service_id) -> None:
2✔
258
        connection_request = self.db_instance.get_value_from_db(
2✔
259
            MongoCollections.CONNECTIONS, service_id
260
        )
261
        if not connection_request:
2✔
262
            return
×
263

264
        connection_request = connection_request
2✔
265
        self.db_instance.delete_one_entry(MongoCollections.CONNECTIONS, service_id)
2✔
266

267
        historical_connections_list = self.db_instance.get_value_from_db(
2✔
268
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
269
        )
270
        # Current timestamp in seconds
271
        timestamp = int(time.time())
2✔
272

273
        if historical_connections_list:
2✔
274
            historical_connections_list.append({timestamp: connection_request})
×
275
            self.db_instance.add_key_value_pair_to_db(
×
276
                MongoCollections.HISTORICAL_CONNECTIONS,
277
                service_id,
278
                historical_connections_list,
279
            )
280
        else:
281
            self.db_instance.add_key_value_pair_to_db(
2✔
282
                MongoCollections.HISTORICAL_CONNECTIONS,
283
                service_id,
284
                [{timestamp: connection_request}],
285
            )
286
        logger.debug(f"Archived connection: {service_id}")
2✔
287

288
    def remove_connection(self, te_manager, service_id) -> Tuple[str, int]:
2✔
289
        te_manager.delete_connection(service_id)
2✔
290
        connection_request = self.db_instance.get_value_from_db(
2✔
291
            MongoCollections.CONNECTIONS, service_id
292
        )
293
        if not connection_request:
2✔
294
            return "Did not find connection request, cannot remove connection", 404
×
295

296
        breakdown = self.db_instance.get_value_from_db(
2✔
297
            MongoCollections.BREAKDOWNS, service_id
298
        )
299
        if not breakdown:
2✔
300
            return "Did not find breakdown, cannot remove connection", 404
×
301

302
        try:
2✔
303
            status, code = self._send_breakdown_to_lc(
2✔
304
                breakdown, "delete", connection_request
305
            )
306
            self.db_instance.delete_one_entry(MongoCollections.BREAKDOWNS, service_id)
2✔
307
            self.archive_connection(service_id)
2✔
308
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
309
            # update topology in DB with updated states (bandwidth and available vlan pool)
310
            topology_db_update(self.db_instance, te_manager)
2✔
311
            return status, code
2✔
312
        except Exception as e:
×
313
            logger.debug(f"Error when removing breakdown: {e}")
×
314
            return f"Error when removing breakdown: {e}", 400
×
315

316
    def handle_link_removal(self, te_manager, removed_links):
2✔
317
        logger.debug("Handling connections that contain removed links.")
×
318
        failed_links = []
×
319
        for link in removed_links:
×
320
            failed_links.append({"id": link.id, "ports": link.ports})
×
321

322
        self.handle_link_failure(te_manager, failed_links)
×
323

324
    def handle_link_failure(self, te_manager, failed_links):
2✔
325
        logger.debug("Handling connections that contain failed links.")
×
326
        link_connections_dict = self.db_instance.get_value_from_db(
×
327
            MongoCollections.LINKS, Constants.LINK_CONNECTIONS_DICT
328
        )
329

330
        if not link_connections_dict:
×
331
            logger.debug("No connection has been placed yet.")
×
332
            return
×
333

334
        link_connections_dict = json.loads(link_connections_dict)
×
335

336
        for link in failed_links:
×
337
            logger.info(f"Handling link failure on {link['id']}")
×
338
            port_list = []
×
339
            if "ports" not in link:
×
340
                continue
×
341
            for port in link["ports"]:
×
342
                port_id = port if isinstance(port, str) else port.get("id")
×
343
                if not port_id:
×
344
                    continue
×
345
                port_list.append(port_id)
×
346

347
            simple_link = SimpleLink(port_list).to_string()
×
348

349
            if simple_link in link_connections_dict:
×
350
                logger.debug("Found failed link record!")
×
351
                service_ids = link_connections_dict[simple_link]
×
352
                for index, service_id in enumerate(service_ids):
×
353
                    logger.info(
×
354
                        f"Connection {service_id} affected by link {link['id']}"
355
                    )
356
                    connection = self.db_instance.get_value_from_db(
×
357
                        MongoCollections.CONNECTIONS, service_id
358
                    )
359
                    if not connection:
×
360
                        logger.debug(f"Did not find connection from db: {service_id}")
×
361
                        continue
×
362

363
                    try:
×
364
                        logger.debug(f"Link Failure: Removing connection: {connection}")
×
365
                        if connection.get("status") is None:
×
366
                            connection["status"] = str(
×
367
                                ConnectionStateMachine.State.ERROR
368
                            )
369
                        else:
370
                            connection, _ = connection_state_machine(
×
371
                                connection, ConnectionStateMachine.State.ERROR
372
                            )
373
                        logger.info(
×
374
                            f"Removing connection: {service_id} {connection.get('status')}"
375
                        )
376
                        self.remove_connection(te_manager, connection["id"])
×
377
                    except Exception as err:
×
378
                        logger.info(
×
379
                            f"Encountered error when deleting connection: {err}"
380
                        )
381
                        continue
×
382

383
                    logger.debug("Removed connection:")
×
384
                    logger.debug(connection)
×
385
                    connection, _ = connection_state_machine(
×
386
                        connection, ConnectionStateMachine.State.RECOVERING
387
                    )
388
                    connection["oxp_success_count"] = 0
×
389
                    self.db_instance.add_key_value_pair_to_db(
×
390
                        MongoCollections.CONNECTIONS, service_id, connection
391
                    )
392
                    _reason, code = self.place_connection(te_manager, connection)
×
393
                    if code // 100 != 2:
×
394
                        connection, _ = connection_state_machine(
×
395
                            connection, ConnectionStateMachine.State.ERROR
396
                        )
397
                        self.db_instance.add_key_value_pair_to_db(
×
398
                            MongoCollections.CONNECTIONS,
399
                            service_id,
400
                            connection,
401
                        )
402

403
                    logger.info(
×
404
                        f"place_connection result: ID: {service_id} reason='{_reason}', code={code}"
405
                    )
406

407
    def get_archived_connections(self, service_id: str):
2✔
408
        historical_connections = self.db_instance.get_value_from_db(
×
409
            MongoCollections.HISTORICAL_CONNECTIONS, service_id
410
        )
411
        if not historical_connections:
×
412
            return None
×
413
        return historical_connections
×
414

415

416
def topology_db_update(db_instance, te_manager):
2✔
417
    # update OXP topology in DB:
418
    oxp_topology_map = te_manager.topology_manager.get_topology_map()
2✔
419
    for domain_name, topology in oxp_topology_map.items():
2✔
420
        msg_json = topology.to_dict()
2✔
421
        db_instance.add_key_value_pair_to_db(
2✔
422
            MongoCollections.TOPOLOGIES, domain_name, msg_json
423
        )
424
    # use 'latest_topo' as PK to save latest full topo to db
425
    latest_topo = te_manager.topology_manager.get_topology().to_dict()
2✔
426
    db_instance.add_key_value_pair_to_db(
2✔
427
        MongoCollections.TOPOLOGIES, Constants.LATEST_TOPOLOGY, latest_topo
428
    )
429
    logger.info("Save to database complete.")
2✔
430

431

432
def get_connection_status(db, service_id: str):
2✔
433
    """
434
    Form a response to `GET /l2vpn/1.0/{service_id}`.
435
    """
436
    assert db is not None
2✔
437
    assert service_id is not None
2✔
438

439
    breakdown = db.read_from_db(MongoCollections.BREAKDOWNS, service_id)
2✔
440
    if not breakdown:
2✔
441
        logger.info(f"Could not find breakdown for {service_id}")
2✔
442
        return {}
2✔
443

444
    logger.info(f"breakdown for {service_id}: {breakdown}")
2✔
445

446
    # The breakdown we read from DB is in this shape:
447
    #
448
    # {
449
    #     "_id": ObjectId("66ec71770c7022eb0922f41a"),
450
    #     "5b7df397-2269-489b-8e03-f256461265a0": {
451
    #         "urn:sdx:topology:amlight.net": {
452
    #             "name": "AMLIGHT_vlan_1000_10001",
453
    #             "dynamic_backup_path": True,
454
    #             "uni_a": {
455
    #                 "tag": {"value": 1000, "tag_type": 1},
456
    #                 "port_id": "urn:sdx:port:amlight.net:A1:1",
457
    #             },
458
    #             "uni_z": {
459
    #                 "tag": {"value": 10001, "tag_type": 1},
460
    #                 "port_id": "urn:sdx:port:amlight.net:B1:3",
461
    #             },
462
    #         }
463
    #     },
464
    # }
465
    #
466
    # We need to shape that into this form, at a minimum:
467
    #
468
    # {
469
    #     "c73da8e1-5d03-4620-a1db-7cdf23e8978c": {
470
    #         "service_id": "c73da8e1-5d03-4620-a1db-7cdf23e8978c",
471
    #         "name": "new-connection",
472
    #         "endpoints": [
473
    #          {
474
    #             "port_id": "urn:sdx:port:amlight.net:A1:1",
475
    #             "vlan": "150"
476
    #          },
477
    #          {
478
    #             "port_id": "urn:sdx:port:amlight:B1:1",
479
    #             "vlan": "300"}
480
    #         ],
481
    #     }
482
    # }
483
    #
484
    # See https://sdx-docs.readthedocs.io/en/latest/specs/provisioning-api-1.0.html#request-format-2
485
    #
486

487
    domains = breakdown.get(service_id)
2✔
488
    logger.info(f"domains for {service_id}: {domains.keys()}")
2✔
489

490
    # Find the name and description from the original connection
491
    # request for this service_id.
492
    name = "unknown"
2✔
493
    description = "unknown"
2✔
494
    status = "unknown"
2✔
495
    qos_metrics = {}
2✔
496
    scheduling = {}
2✔
497
    notifications = {}
2✔
498

499
    endpoints = list()
2✔
500
    request_endpoints = []
2✔
501
    response_endpoints = []
2✔
502
    request_uni_a_id = None
2✔
503
    request_uni_z_id = None
2✔
504

505
    request = db.read_from_db(MongoCollections.CONNECTIONS, service_id)
2✔
506
    if not request:
2✔
507
        logger.error(f"Can't find a connection request for {service_id}")
×
508
        # TODO: we're in a strange state here. Should we panic?
509
    else:
510
        logger.info(f"Found request for {service_id}: {request}")
2✔
511
        # We seem to have saved the original request in the form of a
512
        # string into the DB, not a record.
513
        request_dict = request.get(service_id)
2✔
514
        name = request_dict.get("name")
2✔
515
        description = request_dict.get("description")
2✔
516
        status = request_dict.get("status")
2✔
517
        qos_metrics = request_dict.get("qos_metrics")
2✔
518
        scheduling = request_dict.get("scheduling")
2✔
519
        notifications = request_dict.get("notifications")
2✔
520
        oxp_response = request_dict.get("oxp_response")
2✔
521
        status = parse_conn_status(request_dict.get("status"))
2✔
522
        if request_dict.get("endpoints") is not None:  # spec version 2.0.0
2✔
523
            request_endpoints = request_dict.get("endpoints")
2✔
524
            request_uni_a = request_endpoints[0]
2✔
525
            request_uni_a_id = request_uni_a.get("port_id")
2✔
526
            if request_uni_a_id is None:
2✔
527
                request_uni_a_id = request_uni_a.get("id")
×
528
            request_uni_z = request_endpoints[1]
2✔
529
            request_uni_z_id = request_uni_z.get("port_id")
2✔
530
            if request_uni_z_id is None:
2✔
531
                request_uni_z_id = request_uni_z.get("id")
×
532
        else:  # spec version 1.0.0
533
            request_uni_a = request_dict.get("ingress_port")
2✔
534
            request_uni_a_id = request_uni_a.get("id")
2✔
535
            request_uni_z = request_dict.get("egress_port")
2✔
536
            request_uni_z_id = request_uni_z.get("id")
2✔
537

538
    response = {}
2✔
539

540
    for domain, breakdown in domains.items():
2✔
541
        uni_a_port = breakdown.get("uni_a").get("port_id")
2✔
542
        uni_a_vlan = breakdown.get("uni_a").get("tag").get("value")
2✔
543

544
        endpoint_a = {
2✔
545
            "port_id": uni_a_port,
546
            "vlan": str(uni_a_vlan),
547
        }
548

549
        endpoints.append(endpoint_a)
2✔
550

551
        if request_uni_a_id == uni_a_port:
2✔
552
            (
2✔
553
                response_endpoints.append(endpoint_a)
554
                if endpoint_a not in response_endpoints
555
                else None
556
            )
557
        if request_uni_z_id == uni_a_port:
2✔
558
            (
×
559
                response_endpoints.append(endpoint_a)
560
                if endpoint_a not in response_endpoints
561
                else None
562
            )
563

564
        uni_z_port = breakdown.get("uni_z").get("port_id")
2✔
565
        uni_z_vlan = breakdown.get("uni_z").get("tag").get("value")
2✔
566

567
        endpoint_z = {
2✔
568
            "port_id": uni_z_port,
569
            "vlan": str(uni_z_vlan),
570
        }
571

572
        endpoints.append(endpoint_z)
2✔
573

574
        if request_uni_a_id == uni_z_port:
2✔
575
            (
×
576
                response_endpoints.append(endpoint_z)
577
                if endpoint_z not in response_endpoints
578
                else None
579
            )
580
        if request_uni_z_id == uni_z_port:
2✔
581
            (
2✔
582
                response_endpoints.append(endpoint_z)
583
                if endpoint_z not in response_endpoints
584
                else None
585
            )
586
        print(
2✔
587
            f"endpoints info: {request_uni_a_id}, {request_uni_z_id}, {uni_a_port}, {uni_z_port}"
588
        )
589

590
    # TODO: we're missing many of the attributes in the response here
591
    # which have been specified in the provisioning spec, such as:
592
    # name, description, qos_metrics, notifications, ownership,
593
    # creation_date, archived_date, status, state, counters_location,
594
    # last_modified, current_path, oxp_service_ids.  Implementing each
595
    # of them would be worth a separate ticket each, so we'll just
596
    # make do with this minimal response for now.
597
    response[service_id] = {
2✔
598
        "service_id": service_id,
599
        "name": name,
600
        "description": description,
601
        "status": status,
602
        "endpoints": response_endpoints,
603
        "current_path": endpoints,
604
        "archived_date": 0,
605
        "status": status,
606
    }
607
    if qos_metrics:
2✔
608
        response[service_id]["qos_metrics"] = qos_metrics
2✔
609

610
    if scheduling:
2✔
611
        response[service_id]["scheduling"] = scheduling
×
612

613
    if notifications:
2✔
614
        response[service_id]["notifications"] = notifications
2✔
615

616
    if oxp_response:
2✔
617
        response[service_id]["oxp_response"] = oxp_response
×
618

619
    logger.info(f"Formed a response: {response}")
2✔
620

621
    return response
2✔
622

623

624
def connection_state_machine(connection, new_state):
2✔
625
    conn_sm = ConnectionStateMachine()
2✔
626
    status = connection.get("status")
2✔
627
    value = conn_sm.State[status]
2✔
628
    conn_sm.set_state(value)
2✔
629
    conn_sm.transition(new_state)
2✔
630
    connection["status"] = str(conn_sm.get_state())
2✔
631
    return connection, conn_sm
2✔
632

633

634
def parse_conn_status(conn_state):
2✔
635
    """Parse connection from state to status as specified on the
636
    Provisioning Data Model Spec 1.0a. As per the spec:
637
    - up: if the L2VPN is operational
638
    - down: if the L2VPN is not operational due to topology issues/lack of path, or endpoints being down,
639
    - error: when there is an error with the L2VPN,
640
    - under provisioning: when the L2VPN is still being provisioned by the OXPs
641
    - maintenance: when the L2VPN is being affected by a network maintenance
642
    """
643
    state2status = {
2✔
644
        "UP": "up",
645
        "UNDER_PROVISIONING": "under provisioning",
646
        "RECOVERING": "down",
647
        "DOWN": "down",
648
        "ERROR": "down",
649
        "MODIFYING": "under provisioning",
650
    }
651
    return state2status.get(conn_state, "error")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc