• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 25450424367

06 May 2026 05:21PM UTC coverage: 49.499% (-3.3%) from 52.799%
25450424367

Pull #524

github

web-flow
Merge f97dea5a4 into 26d01cbb9
Pull Request #524: Rollback on failed OXP POST

109 of 412 new or added lines in 5 files covered. (26.46%)

16 existing lines in 4 files now uncovered.

1382 of 2792 relevant lines covered (49.5%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.61
/sdx_controller/controllers/l2vpn_controller.py
1
import copy
2✔
2
import logging
2✔
3
import os
2✔
4
import time
2✔
5
import uuid
2✔
6

7
import connexion
2✔
8
from flask import current_app
2✔
9
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
10
from sdx_datamodel.constants import MongoCollections
2✔
11

12
from sdx_controller.handlers.connection_handler import (
2✔
13
    ConnectionHandler,
14
    connection_state_machine,
15
    get_connection_status,
16
    parse_conn_status,
17
)
18

19
# from sdx_controller.models.l2vpn_service_id_body import L2vpnServiceIdBody  # noqa: E501
20
from sdx_controller.utils.db_utils import DbUtils
2✔
21

22
LOG_FORMAT = (
2✔
23
    "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
24
    "-35s %(lineno) -5d: %(message)s"
25
)
26
logger = logging.getLogger(__name__)
2✔
27
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
28
logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
29
ROLLBACK_SETTLE_TIMEOUT_SECONDS = float(
2✔
30
    os.getenv("ROLLBACK_SETTLE_TIMEOUT_SECONDS", "5")
31
)
32
ROLLBACK_SETTLE_POLL_SECONDS = float(os.getenv("ROLLBACK_SETTLE_POLL_SECONDS", "0.2"))
2✔
33

34
# PATCH must wait for async OXP provisioning responses before deciding whether
35
# the new service is really up or needs rollback to the previous request.
36
PATCH_PROVISIONING_SETTLE_TIMEOUT_SECONDS = int(
2✔
37
    os.getenv("PATCH_PROVISIONING_SETTLE_TIMEOUT_SECONDS", "10")
38
)
39
PATCH_PROVISIONING_SETTLE_POLL_SECONDS = int(
2✔
40
    os.getenv("PATCH_PROVISIONING_SETTLE_POLL_SECONDS", "1")
41
)
42

43
# Get DB connection and tables set up.
44
db_instance = DbUtils()
2✔
45
db_instance.initialize_db()
2✔
46
connection_handler = ConnectionHandler(db_instance)
2✔
47

48

49
def _wait_for_patch_provisioning_to_settle(service_id):
2✔
NEW
50
    deadline = time.time() + PATCH_PROVISIONING_SETTLE_TIMEOUT_SECONDS
×
NEW
51
    connection = db_instance.get_value_from_db(MongoCollections.CONNECTIONS, service_id)
×
52

NEW
53
    while time.time() < deadline:
×
NEW
54
        connection = db_instance.get_value_from_db(
×
55
            MongoCollections.CONNECTIONS, service_id
56
        )
NEW
57
        if not connection:
×
NEW
58
            return None
×
59

NEW
60
        status = connection.get("status")
×
NEW
61
        if status != str(ConnectionStateMachine.State.UNDER_PROVISIONING):
×
NEW
62
            oxp_response = connection.get("oxp_response") or {}
×
NEW
63
            breakdown = db_instance.get_value_from_db(
×
64
                MongoCollections.BREAKDOWNS, service_id
65
            )
NEW
66
            expected_oxp_responses = len(breakdown) if breakdown else 0
×
NEW
67
            if connection.get("partial_cleanup_requested") and (
×
68
                not expected_oxp_responses or len(oxp_response) < expected_oxp_responses
69
            ):
NEW
70
                time.sleep(PATCH_PROVISIONING_SETTLE_POLL_SECONDS)
×
NEW
71
                continue
×
NEW
72
            return connection
×
73

NEW
74
        time.sleep(PATCH_PROVISIONING_SETTLE_POLL_SECONDS)
×
75

NEW
76
    return connection
×
77

78

79
def delete_connection(service_id):
2✔
80
    """
81
    Delete connection order by ID.
82

83
    :param service_id: ID of the connection that needs to be
84
        deleted
85
    :type service_id: str
86

87
    :rtype: None
88
    """
89
    logger.info(
2✔
90
        f"Handling delete (service id: {service_id}) "
91
        f"with te_manager: {current_app.te_manager}"
92
    )
93

94
    # # Looking up by UUID do not seem work yet.  Will address in
95
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/252.
96
    #
97
    # value = db_instance.read_from_db(f"{service_id}")
98
    # print(f"value: {value}")
99
    # if not value:
100
    #     return "Not found", 404
101

102
    try:
2✔
103
        # TODO: pce's unreserve_vlan() method silently returns even if the
104
        # service_id is not found.  This should in fact be an error.
105
        #
106
        # https://github.com/atlanticwave-sdx/pce/issues/180
107
        connection = db_instance.get_value_from_db(
2✔
108
            MongoCollections.CONNECTIONS, f"{service_id}"
109
        )
110

111
        if not connection:
2✔
112
            return "Did not find connection", 404
2✔
113

114
        logger.info(f"connection: {connection} {type(connection)}")
2✔
115
        logger.info(f"Removing connection: {service_id} {connection.get('status')}")
2✔
116

117
        remove_reason, remove_code = connection_handler.remove_connection(
2✔
118
            current_app.te_manager, service_id, "API"
119
        )
120
        if remove_code // 100 != 2:
2✔
UNCOV
121
            logger.info(
×
122
                f"Delete failed (connection id: {service_id}): "
123
                f"reason='{remove_reason}', code={remove_code}"
124
            )
125
            # return remove_reason, remove_code
126
        db_instance.mark_deleted(MongoCollections.CONNECTIONS, f"{service_id}")
2✔
127
        db_instance.mark_deleted(MongoCollections.BREAKDOWNS, f"{service_id}")
2✔
128
    except Exception as e:
×
129
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
130
        return f"Failed, reason: {e}", 500
×
131

132
    return "OK", 200
2✔
133

134

135
def get_connection_by_id(service_id):
2✔
136
    """
137
    Find connection by ID.
138

139
    :param service_id: ID of connection that needs to be fetched
140
    :type service_id: str
141

142
    :rtype: Connection
143
    """
144

145
    value = get_connection_status(db_instance, service_id)
2✔
146

147
    if not value:
2✔
148
        return "Connection not found", 404
2✔
149

150
    return value
2✔
151

152

153
def get_connections():  # noqa: E501
2✔
154
    """
155
    List all connections
156

157
    connection details # noqa: E501
158

159
    :rtype: Connection
160
    """
161
    values = db_instance.get_all_entries_in_collection(MongoCollections.CONNECTIONS)
2✔
162
    if not values:
2✔
163
        return "No connection was found", 404
2✔
164
    return_values = {}
2✔
165
    for connection in values:
2✔
166
        service_id = next(iter(connection))
2✔
167
        logger.info(f"service_id: {service_id}")
2✔
168
        connection_status = get_connection_status(db_instance, service_id)
2✔
169
        if connection_status:
2✔
170
            return_values[service_id] = connection_status.get(service_id)
2✔
171
    return return_values
2✔
172

173

174
def get_archived_connections():
2✔
175
    """
176
    List all archived connections.
177

178
    :rtype: dict
179
    """
180
    values = db_instance.get_all_entries_in_collection(
2✔
181
        MongoCollections.HISTORICAL_CONNECTIONS
182
    )
183
    if not values:
2✔
184
        return "No archived connection was found", 404
2✔
185

186
    return_values = {}
2✔
187
    for archived_connection in values:
2✔
188
        service_id = next(iter(archived_connection))
2✔
189
        archived_events = connection_handler.get_archived_connections(service_id)
2✔
190
        if archived_events:
2✔
191
            return_values[service_id] = archived_events
2✔
192

193
    if not return_values:
2✔
194
        return "No archived connection was found", 404
×
195
    return return_values
2✔
196

197

198
def place_connection(body):
2✔
199
    """
200
    Place an connection request from the SDX-Controller.
201

202
    :param body: order placed for creating a connection
203
    :type body: dict | bytes
204

205
    :rtype: Connection
206
    """
207
    logger.info(f"Placing connection: {body}")
2✔
208
    if not connexion.request.is_json:
2✔
209
        return "Request body must be JSON", 400
×
210

211
    body = connexion.request.get_json()
2✔
212
    logger.info(f"Gathered connexion JSON: {body}")
2✔
213

214
    logger.info("Placing connection. Saving to database.")
2✔
215

216
    service_id = body.get("id")
2✔
217

218
    if service_id is None:
2✔
219
        service_id = str(uuid.uuid4())
2✔
220
        body["id"] = service_id
2✔
221
        logger.info(f"Request has no ID. Generated ID: {service_id}")
2✔
222

223
    conn_status = ConnectionStateMachine.State.REQUESTED
2✔
224
    body["status"] = str(conn_status)
2✔
225

226
    # used in lc_message_handler to count the oxp success response
227
    body["oxp_success_count"] = 0
2✔
228
    body["partial_cleanup_requested"] = False
2✔
229
    body["provisioning_timeout_handled"] = False
2✔
230
    body["provisioning_started_at"] = time.time()
2✔
231
    body.pop("timeout_reason", None)
2✔
232

233
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
2✔
234

235
    logger.info(
2✔
236
        f"Handling request {service_id} with te_manager: {current_app.te_manager}"
237
    )
238
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
2✔
239

240
    if code // 100 == 2:
2✔
241
        # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
242
        # body, _ = connection_state_machine(body, conn_status)
243
        # db_instance.update_field_in_json(
244
        #    MongoCollections.CONNECTIONS,
245
        #    service_id,
246
        #    "status",
247
        #    str(conn_status),
248
        # )
249
        logger.info(f"place_connection succeeds: ID: {service_id} body='{body}'")
2✔
250
    else:
251
        conn_status = ConnectionStateMachine.State.REJECTED
2✔
252
        body, _ = connection_state_machine(body, conn_status)
2✔
253
        db_instance.update_field_in_json(
2✔
254
            MongoCollections.CONNECTIONS,
255
            service_id,
256
            "status",
257
            str(conn_status),
258
        )
259
    logger.info(
2✔
260
        f"place_connection result: ID: {service_id} reason='{reason}', code={code}"
261
    )
262

263
    current_conn = db_instance.get_value_from_db(
2✔
264
        MongoCollections.CONNECTIONS, f"{service_id}"
265
    )
266
    response = {
2✔
267
        "service_id": service_id,
268
        "status": parse_conn_status(
269
            current_conn.get("status", str(conn_status))
270
            if current_conn
271
            else str(conn_status)
272
        ),
273
        "reason": reason,
274
    }
275

276
    # # TODO: our response is supposed to be shaped just like request
277
    # # ('#/components/schemas/connection'), and in that case the below
278
    # # code would be a quick implementation.
279
    # #
280
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/251
281
    # response = body
282

283
    # response["id"] = service_id
284
    # response["status"] = "success" if code == 2xx else "failure"
285
    # response["reason"] = reason # `reason` is not present in schema though.
286

287
    return response, code
2✔
288

289

290
def patch_connection(service_id, body=None):  # noqa: E501
2✔
291
    """Edit and change an existing L2vpn connection by ID from the SDX-Controller
292

293
     # noqa: E501
294

295
    :param service_id: ID of l2vpn connection that needs to be changed
296
    :type service_id: dict | bytes'
297
    :param body:
298
    :type body: dict | bytes
299

300
    :rtype: Connection
301
    """
302
    body = db_instance.get_value_from_db(MongoCollections.CONNECTIONS, f"{service_id}")
×
303
    if not body:
×
304
        return "Connection not found", 404
×
305

306
    if not connexion.request.is_json:
×
307
        return "Request body must be JSON", 400
×
308

309
    new_body = connexion.request.get_json()
×
310

311
    logger.info(f"Gathered connexion JSON: {new_body}")
×
312

313
    if "id" not in new_body:
×
314
        new_body["id"] = service_id
×
315

316
    # Validate the new request body before making any change to the existing connection.
317
    # This is to avoid the case where we have already removed the original connection but the new request body is invalid, which will cause the connection to be deleted but not re-created.
318
    # We can reuse the same validation function used in place_connection since the request body for patch_connection has the same schema as place_connection.
319
    #
320
    te_manager = current_app.te_manager  # Assuming te_manager is accessible like this
×
321
    try:
×
322
        # Validate the new request body
NEW
323
        traffic_matrix = te_manager.generate_traffic_matrix(connection_request=new_body)
×
324
    except Exception as request_err:
×
325
        logger.error("ERROR: invalid patch request: " + str(request_err))
×
326
        error_code = getattr(request_err, "request_code", None)
×
327
        if not isinstance(error_code, int):
×
328
            # Backward-compatible fallback for exception strings like "... (Code: 400)".
329
            error_code = 400
×
330
            err_text = str(request_err)
×
331
            if "Code:" in err_text:
×
332
                candidate = err_text.split("Code:")[-1].replace(")", "").strip()
×
333
                try:
×
334
                    error_code = int(candidate)
×
335
                except (TypeError, ValueError):
×
336
                    logger.warning(
×
337
                        f"Could not parse error code from patch validation error: {err_text}"
338
                    )
339
        return f"Error: patch request is not valid: {request_err}", error_code
×
NEW
340
    if traffic_matrix is None:
×
NEW
341
        return (
×
342
            "Error: patch request is not valid: "
343
            "Request does not have a valid JSON or body is incomplete/incorrect",
344
            400,
345
        )
346

347
    logger.info("Modifying connection")
×
348
    # Preserve the last successful request so rollback can recreate it cleanly.
349
    rollback_conn_body = copy.deepcopy(body)
×
NEW
350
    rollback_conn_body.pop("rollback_performed_for_failed_patch", None)
×
UNCOV
351
    body.update(new_body)
×
352

353
    conn_status = ConnectionStateMachine.State.MODIFYING
×
354
    body, _ = connection_state_machine(body, conn_status)
×
355
    db_instance.update_field_in_json(
×
356
        MongoCollections.CONNECTIONS,
357
        service_id,
358
        "status",
359
        str(conn_status),
360
    )
361

362
    try:
×
363
        logger.info("Removing connection")
×
364
        remove_conn_reason, remove_conn_code = connection_handler.remove_connection(
×
365
            current_app.te_manager, service_id, "API"
366
        )
367

368
        if remove_conn_code // 100 != 2:
×
369
            conn_status = ConnectionStateMachine.State.DOWN
×
370
            body, _ = connection_state_machine(body, conn_status)
×
371
            db_instance.update_field_in_json(
×
372
                MongoCollections.CONNECTIONS,
373
                service_id,
374
                "status",
375
                str(conn_status),
376
            )
377
            response = {
×
378
                "service_id": service_id,
379
                "status": parse_conn_status(body["status"]),
380
                "reason": f"Failure to modify L2VPN during removal: {remove_conn_reason}",
381
            }
382
            return response, remove_conn_code
×
383

384
        logger.info(f"Removed connection: {service_id}")
×
385
    except Exception as e:
×
386
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
387
        conn_status = ConnectionStateMachine.State.DOWN
×
388
        body, _ = connection_state_machine(body, conn_status)
×
389
        db_instance.update_field_in_json(
×
390
            MongoCollections.CONNECTIONS,
391
            service_id,
392
            "status",
393
            str(conn_status),
394
        )
395
        return f"Failed, reason: {e}", 500
×
396
    time.sleep(10)
×
397
    logger.info(
×
398
        f"Modifying: Placing new connection {service_id} with te_manager: {current_app.te_manager}"
399
    )
400
    # Reset: remove_connection archives/deletes the original entry,
401
    # so persist the patched request before re-placement.
402
    conn_status = ConnectionStateMachine.State.REQUESTED
×
403
    body["status"] = str(conn_status)
×
404
    body["oxp_success_count"] = 0
×
405
    body["oxp_response"] = {}
×
NEW
406
    body["rollback_on_failure"] = True
×
NEW
407
    body["rollback_request"] = rollback_conn_body
×
NEW
408
    body["rollback_in_progress"] = False
×
NEW
409
    body.pop("rollback_performed_for_failed_patch", None)
×
410
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
×
411
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
×
412

413
    if code // 100 == 2:
×
NEW
414
        patched_conn = _wait_for_patch_provisioning_to_settle(service_id)
×
NEW
415
        patched_status = patched_conn.get("status") if patched_conn else None
×
NEW
416
        if patched_status == str(ConnectionStateMachine.State.UP):
×
NEW
417
            if patched_conn.get("rollback_performed_for_failed_patch"):
×
NEW
418
                response = {
×
419
                    "service_id": service_id,
420
                    "status": parse_conn_status(patched_status),
421
                    "reason": "Failure, rolled back to last successful L2VPN: "
422
                    "Patched connection provisioning failed",
423
                }
NEW
424
                return response, 400
×
425
            else:
NEW
426
                code = 201
×
NEW
427
                logger.info(f"Placed: ID: {service_id} reason='{reason}', code={code}")
×
NEW
428
                response = {
×
429
                    "service_id": service_id,
430
                    "status": parse_conn_status(patched_status),
431
                    "reason": reason,
432
                }
NEW
433
                return response, code
×
NEW
434
        if patched_status == str(ConnectionStateMachine.State.UNDER_PROVISIONING):
×
NEW
435
            code = 201
×
NEW
436
            logger.info(
×
437
                f"Patch placement for {service_id} is still under provisioning; "
438
                "returning current state."
439
            )
NEW
440
            response = {
×
441
                "service_id": service_id,
442
                "status": parse_conn_status(patched_status),
443
                "reason": reason,
444
            }
NEW
445
            return response, code
×
446

NEW
447
        reason = f"Patched connection provisioning failed: {patched_status}"
×
NEW
448
        code = 400
×
449

450
    logger.info(
×
451
        f"Modifying: Failed to place new connection. ID: {service_id} reason='{reason}', code={code}"
452
    )
453
    logger.info("Rolling back to old connection.")
×
454

455
    # because above placement failed, so re-place the original connection request.
456

457
    rollback_conn_body["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
458
    # used in lc_message_handler to count the oxp success response
459
    rollback_conn_body["oxp_success_count"] = 0
×
460
    rollback_conn_body["oxp_response"] = {}
×
461

462
    conn_request = rollback_conn_body
×
463
    conn_request["id"] = service_id
×
NEW
464
    conn_request["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
NEW
465
    conn_request["oxp_success_count"] = 0
×
NEW
466
    conn_request["oxp_response"] = {}
×
NEW
467
    conn_request["late_cleanup_domains"] = []
×
NEW
468
    conn_request["partial_cleanup_requested"] = False
×
NEW
469
    conn_request["rollback_on_failure"] = False
×
NEW
470
    conn_request["rollback_performed_for_failed_patch"] = True
×
NEW
471
    conn_request.pop("rollback_request", None)
×
NEW
472
    conn_request.pop("rollback_in_progress", None)
×
NEW
473
    conn_request["provisioning_timeout_handled"] = False
×
NEW
474
    conn_request["provisioning_started_at"] = time.time()
×
NEW
475
    conn_request.pop("timeout_reason", None)
×
UNCOV
476
    db_instance.add_key_value_pair_to_db(
×
477
        MongoCollections.CONNECTIONS, service_id, conn_request
478
    )
479

480
    rollback_conn_reason = "Rollback attempt did not complete"
×
481
    try:
×
482
        rollback_conn_reason, rollback_conn_code = connection_handler.place_connection(
×
483
            current_app.te_manager, conn_request
484
        )
485
        if rollback_conn_code // 100 == 2:
×
NEW
486
            rollback_conn = _wait_for_patch_provisioning_to_settle(service_id)
×
NEW
487
            rollback_status = rollback_conn.get("status") if rollback_conn else None
×
NEW
488
            if rollback_status == str(ConnectionStateMachine.State.UP):
×
489
                # Still return the patch failure code to indicate the requested
490
                # modification failed, even though rollback succeeded.
NEW
491
                rollback_conn_code = code
×
492
            else:
NEW
493
                rollback_conn_reason = (
×
494
                    f"Rollback provisioning did not recover service: {rollback_status}"
495
                )
NEW
496
                rollback_conn_code = 500
×
497
        else:
498
            conn_status = ConnectionStateMachine.State.REJECTED
×
499
            body, _ = connection_state_machine(body, conn_status)
×
500
            db_instance.update_field_in_json(
×
501
                MongoCollections.CONNECTIONS,
502
                service_id,
503
                "status",
504
                str(conn_status),
505
            )
NEW
506
            deadline = time.time() + ROLLBACK_SETTLE_TIMEOUT_SECONDS
×
NEW
507
            while time.time() < deadline:
×
NEW
508
                current_conn = db_instance.get_value_from_db(
×
509
                    MongoCollections.CONNECTIONS, service_id
510
                )
NEW
511
                current_status = current_conn.get("status") if current_conn else None
×
NEW
512
                if current_status != str(
×
513
                    ConnectionStateMachine.State.UNDER_PROVISIONING
514
                ):
NEW
515
                    break
×
NEW
516
                time.sleep(ROLLBACK_SETTLE_POLL_SECONDS)
×
UNCOV
517
        logger.info(
×
518
            f"Roll back connection result: ID: {service_id} reason='{rollback_conn_reason}', code={rollback_conn_code}"
519
        )
520
    except Exception as e:
×
521
        conn_status = ConnectionStateMachine.State.REJECTED
×
522
        db_instance.update_field_in_json(
×
523
            MongoCollections.CONNECTIONS,
524
            service_id,
525
            "status",
526
            str(conn_status),
527
        )
528
        logger.info(f"Rollback failed (connection id: {service_id}): {e}")
×
529
        rollback_conn_reason = f"Rollback failed: {e}"
×
530
        rollback_conn_code = 500
×
531

NEW
532
    response_code = code if rollback_conn_code // 100 == 2 else rollback_conn_code
×
UNCOV
533
    current_conn = db_instance.get_value_from_db(
×
534
        MongoCollections.CONNECTIONS, f"{service_id}"
535
    )
536
    response = {
×
537
        "service_id": service_id,
538
        "reason": f"Failure, rolled back to last successful L2VPN: {reason}",
539
        "status": parse_conn_status(
540
            current_conn.get("status", "") if current_conn else ""
541
        ),
542
    }
NEW
543
    return response, response_code
×
544

545

546
def get_archived_connections_by_id(service_id):
2✔
547
    """
548
    List archived connection by ID.
549

550
    :param service_id: ID of connection that needs to be fetched
551
    :type service_id: str
552

553
    :rtype: Connection
554
    """
555

556
    value = connection_handler.get_archived_connections(service_id)
2✔
557

558
    if not value:
2✔
559
        return "Archived connection not found", 404
2✔
560

561
    return {service_id: value}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc