• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 13204339783

07 Feb 2025 04:54PM UTC coverage: 55.423% (-0.1%) from 55.567%
13204339783

Pull #411

github

web-flow
Merge 5ca8513b9 into 2a492f0bc
Pull Request #411: Better error handling

2 of 11 new or added lines in 2 files covered. (18.18%)

20 existing lines in 1 file now uncovered.

1119 of 2019 relevant lines covered (55.42%)

1.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.66
/sdx_controller/handlers/connection_handler.py
1
import json
2✔
2
import logging
2✔
3
import time
2✔
4
import traceback
2✔
5
from typing import Tuple
2✔
6

7
from sdx_datamodel.parsing.exceptions import ServiceNotSupportedException
2✔
8
from sdx_pce.load_balancing.te_solver import TESolver
2✔
9
from sdx_pce.topology.temanager import TEManager
2✔
10
from sdx_pce.utils.exceptions import RequestValidationError, TEError
2✔
11

12
from sdx_controller.messaging.topic_queue_producer import TopicQueueProducer
2✔
13
from sdx_controller.models.simple_link import SimpleLink
2✔
14
from sdx_controller.utils.parse_helper import ParseHelper
2✔
15

16
logger = logging.getLogger(__name__)
2✔
17
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
18

19

20
class ConnectionHandler:
2✔
21
    def __init__(self, db_instance):
2✔
22
        self.db_instance = db_instance
2✔
23
        self.parse_helper = ParseHelper()
2✔
24

25
    def _send_breakdown_to_lc(self, breakdown, operation, connection_request):
2✔
26
        logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}")
2✔
27

28
        if breakdown is None:
2✔
29
            return "Could not break down the solution", 400
×
30

31
        link_connections_dict_json = (
2✔
32
            self.db_instance.read_from_db("links", "link_connections_dict")[
33
                "link_connections_dict"
34
            ]
35
            if self.db_instance.read_from_db("links", "link_connections_dict")
36
            else None
37
        )
38

39
        if link_connections_dict_json:
2✔
40
            link_connections_dict = json.loads(link_connections_dict_json)
2✔
41
        else:
42
            link_connections_dict = {}
2✔
43

44
        interdomain_a, interdomain_b = None, None
2✔
45
        for domain, link in breakdown.items():
2✔
46
            port_list = []
2✔
47
            for key in link.keys():
2✔
48
                if "uni_" in key and "port_id" in link[key]:
2✔
49
                    port_list.append(link[key]["port_id"])
2✔
50

51
            if port_list:
2✔
52
                simple_link = SimpleLink(port_list).to_string()
2✔
53

54
                if simple_link not in link_connections_dict:
2✔
55
                    link_connections_dict[simple_link] = []
2✔
56

57
                if (
2✔
58
                    operation == "post"
59
                    and connection_request not in link_connections_dict[simple_link]
60
                ):
61
                    link_connections_dict[simple_link].append(connection_request)
2✔
62

63
                if (
2✔
64
                    operation == "delete"
65
                    and connection_request in link_connections_dict[simple_link]
66
                ):
67
                    link_connections_dict[simple_link].remove(connection_request)
2✔
68

69
                self.db_instance.add_key_value_pair_to_db(
2✔
70
                    "links", "link_connections_dict", json.dumps(link_connections_dict)
71
                )
72

73
            if interdomain_a:
2✔
74
                interdomain_b = link.get("uni_a", {}).get("port_id")
2✔
75
            else:
76
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
77

78
            if interdomain_a and interdomain_b:
2✔
79
                simple_link = SimpleLink([interdomain_a, interdomain_b]).to_string()
2✔
80

81
                if simple_link not in link_connections_dict:
2✔
82
                    link_connections_dict[simple_link] = []
2✔
83

84
                if (
2✔
85
                    operation == "post"
86
                    and connection_request not in link_connections_dict[simple_link]
87
                ):
88
                    link_connections_dict[simple_link].append(connection_request)
2✔
89

90
                if (
2✔
91
                    operation == "delete"
92
                    and connection_request in link_connections_dict[simple_link]
93
                ):
94
                    link_connections_dict[simple_link].remove(connection_request)
2✔
95

96
                self.db_instance.add_key_value_pair_to_db(
2✔
97
                    "links", "link_connections_dict", json.dumps(link_connections_dict)
98
                )
99

100
                interdomain_a = link.get("uni_z", {}).get("port_id")
2✔
101

102
            logger.debug(f"Attempting to publish domain: {domain}, link: {link}")
2✔
103

104
            # From "urn:ogf:network:sdx:topology:amlight.net", attempt to
105
            # extract a string like "amlight".
106
            domain_name = self.parse_helper.find_domain_name(domain, ":") or f"{domain}"
2✔
107
            exchange_name = "connection"
2✔
108

109
            logger.debug(
2✔
110
                f"Doing '{operation}' operation for '{link}' with exchange_name: {exchange_name}, "
111
                f"routing_key: {domain_name}"
112
            )
113
            mq_link = {
2✔
114
                "operation": operation,
115
                "service_id": connection_request.get("id"),
116
                "link": link,
117
            }
118
            producer = TopicQueueProducer(
2✔
119
                timeout=5, exchange_name=exchange_name, routing_key=domain_name
120
            )
121
            producer.call(json.dumps(mq_link))
2✔
122
            producer.stop_keep_alive()
2✔
123

124
        # We will get to this point only if all the previous steps
125
        # leading up to this point were successful.
126
        return "Connection published", 200
2✔
127

128
    def place_connection(
2✔
129
        self, te_manager: TEManager, connection_request: dict
130
    ) -> Tuple[str, int]:
131
        """
132
        Do the actual work of creating a connection.
133

134
        This method will call pce library to generate a breakdown
135
        across relevant domains, and then send individual connection
136
        requests to each of those domains.
137

138
        Note that we can return early if things fail.  Return value is
139
        a tuple of the form (reason, HTTP code).
140
        """
141
        # for num, val in enumerate(te_manager.get_topology_map().values()):
142
        #     logger.debug(f"TE topology #{num}: {val}")
143

144
        graph = te_manager.generate_graph_te()
2✔
145
        if graph is None:
2✔
146
            return "No SDX topology found", 424
×
147
        try:
2✔
148
            traffic_matrix = te_manager.generate_traffic_matrix(
2✔
149
                connection_request=connection_request
150
            )
151
        except RequestValidationError as request_err:
2✔
152
            err = traceback.format_exc().replace("\n", ", ")
2✔
153
            logger.error(
2✔
154
                f"Error when parsing and validating request: {request_err} - {err}"
155
            )
156
            return f"Error: {request_err}", request_err.request_code
2✔
157
        except ServiceNotSupportedException as service_err:
×
158
            err = traceback.format_exc().replace("\n", ", ")
×
159
            logger.error(
×
160
                f"Error when parsing and validating request: {service_err} - {err}"
161
            )
162
            return f"Error: {service_err}", 402
×
163

164
        if traffic_matrix is None:
2✔
165
            return (
2✔
166
                "Request does not have a valid JSON or body is incomplete/incorrect",
167
                400,
168
            )
169

170
        logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'")
2✔
171
        try:
2✔
172
            conn = te_manager.requests_connectivity(traffic_matrix)
2✔
173
            if conn is False:
2✔
174
                logger.error(f"Graph connectivity: {conn}")
×
175
                raise TEError("No path is available, the graph is not connected", 412)
×
176
        except TEError as te_err:
×
177
            return f"PCE error: {te_err}", te_err.te_code
×
178

179
        solver = TESolver(graph, traffic_matrix)
2✔
180
        solution = solver.solve()
2✔
181
        logger.debug(f"TESolver result: {solution}")
2✔
182

183
        if solution is None or solution.connection_map is None:
2✔
184
            return "Could not solve the request", 410
×
185

186
        try:
2✔
187
            breakdown = te_manager.generate_connection_breakdown(
2✔
188
                solution, connection_request
189
            )
190
            self.db_instance.add_key_value_pair_to_db(
2✔
191
                "breakdowns", connection_request["id"], breakdown
192
            )
193
            status, code = self._send_breakdown_to_lc(
2✔
194
                breakdown, "post", connection_request
195
            )
196
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
197
            return status, code
2✔
198
        except TEError as te_err:
×
199
            # We could probably return te_err.te_code instead of 400,
200
            # but I don't think PCE should use HTTP error codes,
201
            # because that violates abstraction boundaries.
202
            return f"PCE error: {te_err}", te_err.te_code
×
203
        except Exception as e:
×
204
            err = traceback.format_exc().replace("\n", ", ")
×
205
            logger.error(f"Error when generating/publishing breakdown: {e} - {err}")
×
206
            return f"Error: {e}", 410
×
207

208
    def archive_connection(self, service_id) -> None:
2✔
209
        connection_request = self.db_instance.read_from_db("connections", service_id)
2✔
210
        if not connection_request:
2✔
211
            return
×
212

213
        connection_request_str = connection_request[service_id]
2✔
214
        self.db_instance.delete_one_entry("connections", service_id)
2✔
215

216
        historical_connections = self.db_instance.read_from_db(
2✔
217
            "historical_connections", service_id
218
        )
219
        # Current timestamp in seconds
220
        timestamp = int(time.time())
2✔
221

222
        if historical_connections:
2✔
223
            historical_connections_list = historical_connections[service_id]
×
224
            historical_connections_list.append(
×
225
                json.dumps({timestamp: json.loads(connection_request_str)})
226
            )
227
            self.db_instance.add_key_value_pair_to_db(
×
228
                "historical_connections", service_id, historical_connections_list
229
            )
230
        else:
231
            self.db_instance.add_key_value_pair_to_db(
2✔
232
                "historical_connections",
233
                service_id,
234
                [json.dumps({timestamp: json.loads(connection_request_str)})],
235
            )
236
        logger.debug(f"Archived connection: {service_id}")
2✔
237

238
    def remove_connection(self, te_manager, service_id) -> Tuple[str, int]:
2✔
239
        try:
2✔
240
            te_manager.delete_connection(service_id)
2✔
NEW
UNCOV
241
        except Exception as err:
×
NEW
242
            logger.info(f"Encountered error when deleting connection: {err}")
×
NEW
UNCOV
243
            return f"Error when removing connection: {err}", 400
×
244

245
        connection_request = self.db_instance.read_from_db("connections", service_id)
2✔
246
        if not connection_request:
2✔
UNCOV
247
            return "Did not find connection request, cannot remove connection", 404
×
248

249
        connection_request = connection_request[service_id]
2✔
250

251
        breakdown = self.db_instance.read_from_db("breakdowns", service_id)
2✔
252
        if not breakdown:
2✔
UNCOV
253
            return "Did not find breakdown, cannot remove connection", 404
×
254
        breakdown = breakdown[service_id]
2✔
255

256
        try:
2✔
257
            status, code = self._send_breakdown_to_lc(
2✔
258
                breakdown, "delete", json.loads(connection_request)
259
            )
260
            self.db_instance.delete_one_entry("breakdowns", service_id)
2✔
261
            self.archive_connection(service_id)
2✔
262
            logger.debug(f"Breakdown sent to LC, status: {status}, code: {code}")
2✔
263
            return status, code
2✔
264
        except Exception as e:
×
265
            logger.debug(f"Error when removing breakdown: {e}")
×
NEW
UNCOV
266
            return f"Error when removing breakdown: {e}", 400
×
267

268
    def handle_link_failure(self, te_manager, failed_links):
2✔
269
        logger.debug("---Handling connections that contain failed link.---")
×
UNCOV
270
        link_connections_dict_str = self.db_instance.read_from_db(
×
271
            "links", "link_connections_dict"
272
        )
273

274
        if (
×
275
            not link_connections_dict_str
276
            or not link_connections_dict_str["link_connections_dict"]
277
        ):
UNCOV
278
            logger.debug("No connection has been placed yet.")
×
UNCOV
279
            return
×
280

281
        link_connections_dict = json.loads(
×
282
            link_connections_dict_str["link_connections_dict"]
283
        )
284

285
        for link in failed_links:
×
286
            logger.info(f"Handling link failure on {link['id']} ({link['ports']})")
×
287
            port_list = []
×
288
            if "ports" not in link:
×
289
                continue
×
UNCOV
290
            for port in link["ports"]:
×
291
                port_id = port if isinstance(port, str) else port.get("id")
×
UNCOV
292
                if not port_id:
×
293
                    continue
×
294
                port_list.append(port_id)
×
295

296
            simple_link = SimpleLink(port_list).to_string()
×
297

UNCOV
298
            if simple_link in link_connections_dict:
×
UNCOV
299
                logger.debug("Found failed link record!")
×
300
                connections = link_connections_dict[simple_link]
×
301
                for index, connection in enumerate(connections):
×
302
                    logger.info(
×
303
                        f"Connection {connection['id']} affected by link {link['id']}"
304
                    )
305
                    if "id" not in connection:
×
306
                        continue
×
307
                    self.remove_connection(te_manager, connection["id"])
×
308
                    del link_connections_dict[simple_link][index]
×
309
                    logger.debug("Removed connection:")
×
310
                    logger.debug(connection)
×
311
                    _reason, code = self.place_connection(te_manager, connection)
×
312
                    if code // 100 == 2:
×
313
                        self.db_instance.add_key_value_pair_to_db(
×
314
                            "connections", connection["id"], json.dumps(connection)
315
                        )
316

317
    def get_archived_connections(self, service_id: str):
2✔
UNCOV
318
        historical_connections = self.db_instance.read_from_db(
×
319
            "historical_connections", service_id
320
        )
321
        if not historical_connections:
×
UNCOV
322
            return None
×
UNCOV
323
        return historical_connections[service_id]
×
324

325

326
def get_connection_status(db, service_id: str):
2✔
327
    """
328
    Form a response to `GET /l2vpn/1.0/{service_id}`.
329
    """
330
    assert db is not None
2✔
331
    assert service_id is not None
2✔
332

333
    breakdown = db.read_from_db("breakdowns", service_id)
2✔
334
    if not breakdown:
2✔
335
        logger.info(f"Could not find breakdown for {service_id}")
2✔
336
        return None
2✔
337

338
    logger.info(f"breakdown for {service_id}: {breakdown}")
2✔
339

340
    # The breakdown we read from DB is in this shape:
341
    #
342
    # {
343
    #     "_id": ObjectId("66ec71770c7022eb0922f41a"),
344
    #     "5b7df397-2269-489b-8e03-f256461265a0": {
345
    #         "urn:sdx:topology:amlight.net": {
346
    #             "name": "AMLIGHT_vlan_1000_10001",
347
    #             "dynamic_backup_path": True,
348
    #             "uni_a": {
349
    #                 "tag": {"value": 1000, "tag_type": 1},
350
    #                 "port_id": "urn:sdx:port:amlight.net:A1:1",
351
    #             },
352
    #             "uni_z": {
353
    #                 "tag": {"value": 10001, "tag_type": 1},
354
    #                 "port_id": "urn:sdx:port:amlight.net:B1:3",
355
    #             },
356
    #         }
357
    #     },
358
    # }
359
    #
360
    # We need to shape that into this form, at a minimum:
361
    #
362
    # {
363
    #     "c73da8e1-5d03-4620-a1db-7cdf23e8978c": {
364
    #         "service_id": "c73da8e1-5d03-4620-a1db-7cdf23e8978c",
365
    #         "name": "new-connection",
366
    #         "endpoints": [
367
    #          {
368
    #             "port_id": "urn:sdx:port:amlight.net:A1:1",
369
    #             "vlan": "150"
370
    #          },
371
    #          {
372
    #             "port_id": "urn:sdx:port:amlight:B1:1",
373
    #             "vlan": "300"}
374
    #         ],
375
    #     }
376
    # }
377
    #
378
    # See https://sdx-docs.readthedocs.io/en/latest/specs/provisioning-api-1.0.html#request-format-2
379
    #
380

381
    domains = breakdown.get(service_id)
2✔
382
    logger.info(f"domains for {service_id}: {domains.keys()}")
2✔
383

384
    # Find the name and description from the original connection
385
    # request for this service_id.
386
    name = "unknown"
2✔
387
    description = "unknown"
2✔
388
    qos_metrics = {}
2✔
389
    scheduling = {}
2✔
390
    notifications = {}
2✔
391

392
    endpoints = list()
2✔
393
    request_endpoints = []
2✔
394
    response_endpoints = []
2✔
395
    request_uni_a_id = None
2✔
396
    request_uni_z_id = None
2✔
397

398
    request = db.read_from_db("connections", service_id)
2✔
399
    if not request:
2✔
UNCOV
400
        logger.error(f"Can't find a connection request for {service_id}")
×
401
        # TODO: we're in a strange state here. Should we panic?
402
    else:
403
        logger.info(f"Found request for {service_id}: {request}")
2✔
404
        # We seem to have saved the original request in the form of a
405
        # string into the DB, not a record.
406
        request_dict = json.loads(request.get(service_id))
2✔
407
        name = request_dict.get("name")
2✔
408
        description = request_dict.get("description")
2✔
409
        qos_metrics = request_dict.get("qos_metrics")
2✔
410
        scheduling = request_dict.get("scheduling")
2✔
411
        notifications = request_dict.get("notifications")
2✔
412
        oxp_response_code = request_dict.get("oxp_response_code")
2✔
413
        oxp_response = request_dict.get("oxp_response")
2✔
414
        if request_dict.get("endpoints") is not None:  # spec version 2.0.0
2✔
415
            request_endpoints = request_dict.get("endpoints")
2✔
416
            request_uni_a = request_endpoints[0]
2✔
417
            request_uni_a_id = request_uni_a.get("port_id")
2✔
418
            if request_uni_a_id is None:
2✔
419
                request_uni_a_id = request_uni_a.get("id")
2✔
420
            request_uni_z = request_endpoints[1]
2✔
421
            request_uni_z_id = request_uni_z.get("port_id")
2✔
422
            if request_uni_z_id is None:
2✔
423
                request_uni_z_id = request_uni_z.get("id")
2✔
424
        else:  # spec version 1.0.0
425
            request_uni_a = request_dict.get("ingress_port")
2✔
426
            request_uni_a_id = request_uni_a.get("id")
2✔
427
            request_uni_z = request_dict.get("egress_port")
2✔
428
            request_uni_z_id = request_uni_z.get("id")
2✔
429

430
    response = {}
2✔
431

432
    for domain, breakdown in domains.items():
2✔
433
        uni_a_port = breakdown.get("uni_a").get("port_id")
2✔
434
        uni_a_vlan = breakdown.get("uni_a").get("tag").get("value")
2✔
435

436
        endpoint_a = {
2✔
437
            "port_id": uni_a_port,
438
            "vlan": str(uni_a_vlan),
439
        }
440

441
        endpoints.append(endpoint_a)
2✔
442

443
        if request_uni_a_id == uni_a_port:
2✔
444
            (
2✔
445
                response_endpoints.append(endpoint_a)
446
                if endpoint_a not in response_endpoints
447
                else None
448
            )
449
        if request_uni_z_id == uni_a_port:
2✔
UNCOV
450
            (
×
451
                response_endpoints.append(endpoint_a)
452
                if endpoint_a not in response_endpoints
453
                else None
454
            )
455

456
        uni_z_port = breakdown.get("uni_z").get("port_id")
2✔
457
        uni_z_vlan = breakdown.get("uni_z").get("tag").get("value")
2✔
458

459
        endpoint_z = {
2✔
460
            "port_id": uni_z_port,
461
            "vlan": str(uni_z_vlan),
462
        }
463

464
        endpoints.append(endpoint_z)
2✔
465

466
        if request_uni_a_id == uni_z_port:
2✔
UNCOV
467
            (
×
468
                response_endpoints.append(endpoint_z)
469
                if endpoint_z not in response_endpoints
470
                else None
471
            )
472
        if request_uni_z_id == uni_z_port:
2✔
473
            (
2✔
474
                response_endpoints.append(endpoint_z)
475
                if endpoint_z not in response_endpoints
476
                else None
477
            )
478
        print(
2✔
479
            f"endpoints info: {request_uni_a_id}, {request_uni_z_id}, {uni_a_port}, {uni_z_port}"
480
        )
481

482
    # TODO: we're missing many of the attributes in the response here
483
    # which have been specified in the provisioning spec, such as:
484
    # name, description, qos_metrics, notifications, ownership,
485
    # creation_date, archived_date, status, state, counters_location,
486
    # last_modified, current_path, oxp_service_ids.  Implementing each
487
    # of them would be worth a separate ticket each, so we'll just
488
    # make do with this minimal response for now.
489
    response[service_id] = {
2✔
490
        "service_id": service_id,
491
        "name": name,
492
        "description": description,
493
        "endpoints": response_endpoints,
494
        "current_path": endpoints,
495
        "archived_date": 0,
496
    }
497
    if qos_metrics:
2✔
498
        response[service_id]["qos_metrics"] = qos_metrics
2✔
499

500
    if scheduling:
2✔
UNCOV
501
        response[service_id]["scheduling"] = scheduling
×
502

503
    if notifications:
2✔
504
        response[service_id]["notifications"] = notifications
2✔
505

506
    if oxp_response_code:
2✔
UNCOV
507
        response[service_id]["oxp_response_code"] = oxp_response_code
×
508

509
    if oxp_response:
2✔
510
        response[service_id]["oxp_response"] = oxp_response
×
511

512
    logger.info(f"Formed a response: {response}")
2✔
513

514
    return response
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc