• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 25141814280

30 Apr 2026 12:57AM UTC coverage: 50.292% (-2.5%) from 52.799%
25141814280

Pull #524

github

web-flow
Merge f97220711 into 26d01cbb9
Pull Request #524: Rollback on failed OXP POST

112 of 359 new or added lines in 5 files covered. (31.2%)

141 existing lines in 4 files now uncovered.

1379 of 2742 relevant lines covered (50.29%)

1.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.56
/sdx_controller/controllers/l2vpn_controller.py
1
import copy
2✔
2
import logging
2✔
3
import os
2✔
4
import time
2✔
5
import uuid
2✔
6

7
import connexion
2✔
8
from flask import current_app
2✔
9
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
10
from sdx_datamodel.constants import MongoCollections
2✔
11

12
from sdx_controller.handlers.connection_handler import (
2✔
13
    ConnectionHandler,
14
    connection_state_machine,
15
    get_connection_status,
16
    parse_conn_status,
17
)
18

19
# from sdx_controller.models.l2vpn_service_id_body import L2vpnServiceIdBody  # noqa: E501
20
from sdx_controller.utils.db_utils import DbUtils
2✔
21

22
LOG_FORMAT = (
2✔
23
    "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
24
    "-35s %(lineno) -5d: %(message)s"
25
)
26
logger = logging.getLogger(__name__)
2✔
27
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
28
logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
29
ROLLBACK_SETTLE_TIMEOUT_SECONDS = float(
2✔
30
    os.getenv("ROLLBACK_SETTLE_TIMEOUT_SECONDS", "5")
31
)
32
ROLLBACK_SETTLE_POLL_SECONDS = float(os.getenv("ROLLBACK_SETTLE_POLL_SECONDS", "0.2"))
2✔
33

34
# PATCH must wait for async OXP provisioning responses before deciding whether
35
# the new service is really up or needs rollback to the previous request.
36
PATCH_PROVISIONING_SETTLE_TIMEOUT_SECONDS = int(
2✔
37
    os.getenv("PATCH_PROVISIONING_SETTLE_TIMEOUT_SECONDS", "10")
38
)
39
PATCH_PROVISIONING_SETTLE_POLL_SECONDS = int(
2✔
40
    os.getenv("PATCH_PROVISIONING_SETTLE_POLL_SECONDS", "1")
41
)
42

43
# Get DB connection and tables set up.
44
db_instance = DbUtils()
2✔
45
db_instance.initialize_db()
2✔
46
connection_handler = ConnectionHandler(db_instance)
2✔
47

48

49
def _wait_for_patch_provisioning_to_settle(service_id):
2✔
NEW
50
    deadline = time.time() + PATCH_PROVISIONING_SETTLE_TIMEOUT_SECONDS
×
NEW
51
    connection = db_instance.get_value_from_db(MongoCollections.CONNECTIONS, service_id)
×
52

NEW
53
    while time.time() < deadline:
×
NEW
54
        connection = db_instance.get_value_from_db(
×
55
            MongoCollections.CONNECTIONS, service_id
56
        )
NEW
57
        if not connection:
×
NEW
58
            return None
×
59

NEW
60
        status = connection.get("status")
×
NEW
61
        if status != str(ConnectionStateMachine.State.UNDER_PROVISIONING):
×
NEW
62
            oxp_response = connection.get("oxp_response") or {}
×
NEW
63
            breakdown = db_instance.get_value_from_db(
×
64
                MongoCollections.BREAKDOWNS, service_id
65
            )
NEW
66
            expected_oxp_responses = len(breakdown) if breakdown else 0
×
NEW
67
            if (
×
68
                connection.get("partial_cleanup_requested")
69
                and (
70
                    not expected_oxp_responses
71
                    or len(oxp_response) < expected_oxp_responses
72
                )
73
            ):
NEW
74
                time.sleep(PATCH_PROVISIONING_SETTLE_POLL_SECONDS)
×
NEW
75
                continue
×
NEW
76
            return connection
×
77

NEW
78
        time.sleep(PATCH_PROVISIONING_SETTLE_POLL_SECONDS)
×
79

UNCOV
80
    return connection
×
81

82

83
def delete_connection(service_id):
2✔
84
    """
85
    Delete connection order by ID.
86

87
    :param service_id: ID of the connection that needs to be
88
        deleted
89
    :type service_id: str
90

91
    :rtype: None
92
    """
93
    logger.info(
2✔
94
        f"Handling delete (service id: {service_id}) "
95
        f"with te_manager: {current_app.te_manager}"
96
    )
97

98
    # # Looking up by UUID do not seem work yet.  Will address in
99
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/252.
100
    #
101
    # value = db_instance.read_from_db(f"{service_id}")
102
    # print(f"value: {value}")
103
    # if not value:
104
    #     return "Not found", 404
105

106
    try:
2✔
107
        # TODO: pce's unreserve_vlan() method silently returns even if the
108
        # service_id is not found.  This should in fact be an error.
109
        #
110
        # https://github.com/atlanticwave-sdx/pce/issues/180
111
        connection = db_instance.get_value_from_db(
2✔
112
            MongoCollections.CONNECTIONS, f"{service_id}"
113
        )
114

115
        if not connection:
2✔
116
            return "Did not find connection", 404
2✔
117

118
        logger.info(f"connection: {connection} {type(connection)}")
2✔
119
        logger.info(f"Removing connection: {service_id} {connection.get('status')}")
2✔
120

121
        remove_reason, remove_code = connection_handler.remove_connection(
2✔
122
            current_app.te_manager, service_id, "API"
123
        )
124
        if remove_code // 100 != 2:
2✔
UNCOV
125
            logger.info(
×
126
                f"Delete failed (connection id: {service_id}): "
127
                f"reason='{remove_reason}', code={remove_code}"
128
            )
129
            # return remove_reason, remove_code
130
        db_instance.mark_deleted(MongoCollections.CONNECTIONS, f"{service_id}")
2✔
131
        db_instance.mark_deleted(MongoCollections.BREAKDOWNS, f"{service_id}")
2✔
UNCOV
132
    except Exception as e:
×
UNCOV
133
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
UNCOV
134
        return f"Failed, reason: {e}", 500
×
135

136
    return "OK", 200
2✔
137

138

139
def get_connection_by_id(service_id):
2✔
140
    """
141
    Find connection by ID.
142

143
    :param service_id: ID of connection that needs to be fetched
144
    :type service_id: str
145

146
    :rtype: Connection
147
    """
148

149
    value = get_connection_status(db_instance, service_id)
2✔
150

151
    if not value:
2✔
152
        return "Connection not found", 404
2✔
153

154
    return value
2✔
155

156

157
def get_connections():  # noqa: E501
2✔
158
    """
159
    List all connections
160

161
    connection details # noqa: E501
162

163
    :rtype: Connection
164
    """
165
    values = db_instance.get_all_entries_in_collection(MongoCollections.CONNECTIONS)
2✔
166
    if not values:
2✔
167
        return "No connection was found", 404
2✔
168
    return_values = {}
2✔
169
    for connection in values:
2✔
170
        service_id = next(iter(connection))
2✔
171
        logger.info(f"service_id: {service_id}")
2✔
172
        connection_status = get_connection_status(db_instance, service_id)
2✔
173
        if connection_status:
2✔
174
            return_values[service_id] = connection_status.get(service_id)
2✔
175
    return return_values
2✔
176

177

178
def get_archived_connections():
2✔
179
    """
180
    List all archived connections.
181

182
    :rtype: dict
183
    """
184
    values = db_instance.get_all_entries_in_collection(
2✔
185
        MongoCollections.HISTORICAL_CONNECTIONS
186
    )
187
    if not values:
2✔
188
        return "No archived connection was found", 404
2✔
189

190
    return_values = {}
2✔
191
    for archived_connection in values:
2✔
192
        service_id = next(iter(archived_connection))
2✔
193
        archived_events = connection_handler.get_archived_connections(service_id)
2✔
194
        if archived_events:
2✔
195
            return_values[service_id] = archived_events
2✔
196

197
    if not return_values:
2✔
UNCOV
198
        return "No archived connection was found", 404
×
199
    return return_values
2✔
200

201

202
def place_connection(body):
2✔
203
    """
204
    Place an connection request from the SDX-Controller.
205

206
    :param body: order placed for creating a connection
207
    :type body: dict | bytes
208

209
    :rtype: Connection
210
    """
211
    logger.info(f"Placing connection: {body}")
2✔
212
    if not connexion.request.is_json:
2✔
UNCOV
213
        return "Request body must be JSON", 400
×
214

215
    body = connexion.request.get_json()
2✔
216
    logger.info(f"Gathered connexion JSON: {body}")
2✔
217

218
    logger.info("Placing connection. Saving to database.")
2✔
219

220
    service_id = body.get("id")
2✔
221

222
    if service_id is None:
2✔
223
        service_id = str(uuid.uuid4())
2✔
224
        body["id"] = service_id
2✔
225
        logger.info(f"Request has no ID. Generated ID: {service_id}")
2✔
226

227
    conn_status = ConnectionStateMachine.State.REQUESTED
2✔
228
    body["status"] = str(conn_status)
2✔
229

230
    # used in lc_message_handler to count the oxp success response
231
    body["oxp_success_count"] = 0
2✔
232
    body["partial_cleanup_requested"] = False
2✔
233
    body["provisioning_timeout_handled"] = False
2✔
234
    body["provisioning_started_at"] = time.time()
2✔
235
    body.pop("timeout_reason", None)
2✔
236

237
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
2✔
238

239
    logger.info(
2✔
240
        f"Handling request {service_id} with te_manager: {current_app.te_manager}"
241
    )
242
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
2✔
243

244
    if code // 100 == 2:
2✔
245
        # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
246
        # body, _ = connection_state_machine(body, conn_status)
247
        # db_instance.update_field_in_json(
248
        #    MongoCollections.CONNECTIONS,
249
        #    service_id,
250
        #    "status",
251
        #    str(conn_status),
252
        # )
253
        logger.info(f"place_connection succeeds: ID: {service_id} body='{body}'")
2✔
254
    else:
255
        conn_status = ConnectionStateMachine.State.REJECTED
2✔
256
        body, _ = connection_state_machine(body, conn_status)
2✔
257
        db_instance.update_field_in_json(
2✔
258
            MongoCollections.CONNECTIONS,
259
            service_id,
260
            "status",
261
            str(conn_status),
262
        )
263
    logger.info(
2✔
264
        f"place_connection result: ID: {service_id} reason='{reason}', code={code}"
265
    )
266

267
    current_conn = db_instance.get_value_from_db(
2✔
268
        MongoCollections.CONNECTIONS, f"{service_id}"
269
    )
270
    response = {
2✔
271
        "service_id": service_id,
272
        "status": parse_conn_status(
273
            current_conn.get("status", str(conn_status))
274
            if current_conn
275
            else str(conn_status)
276
        ),
277
        "reason": reason,
278
    }
279

280
    # # TODO: our response is supposed to be shaped just like request
281
    # # ('#/components/schemas/connection'), and in that case the below
282
    # # code would be a quick implementation.
283
    # #
284
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/251
285
    # response = body
286

287
    # response["id"] = service_id
288
    # response["status"] = "success" if code == 2xx else "failure"
289
    # response["reason"] = reason # `reason` is not present in schema though.
290

291
    return response, code
2✔
292

293

294
def patch_connection(service_id, body=None):  # noqa: E501
2✔
295
    """Edit and change an existing L2vpn connection by ID from the SDX-Controller
296

297
     # noqa: E501
298

299
    :param service_id: ID of l2vpn connection that needs to be changed
300
    :type service_id: dict | bytes'
301
    :param body:
302
    :type body: dict | bytes
303

304
    :rtype: Connection
305
    """
306
    body = db_instance.get_value_from_db(MongoCollections.CONNECTIONS, f"{service_id}")
×
307
    if not body:
×
UNCOV
308
        return "Connection not found", 404
×
309

UNCOV
310
    if not connexion.request.is_json:
×
311
        return "Request body must be JSON", 400
×
312

313
    new_body = connexion.request.get_json()
×
314

UNCOV
315
    logger.info(f"Gathered connexion JSON: {new_body}")
×
316

UNCOV
317
    if "id" not in new_body:
×
UNCOV
318
        new_body["id"] = service_id
×
319

320
    # Validate the new request body before making any change to the existing connection.
321
    # This is to avoid the case where we have already removed the original connection but the new request body is invalid, which will cause the connection to be deleted but not re-created.
322
    # We can reuse the same validation function used in place_connection since the request body for patch_connection has the same schema as place_connection.
323
    #
324
    te_manager = current_app.te_manager  # Assuming te_manager is accessible like this
×
325
    try:
×
326
        # Validate the new request body
327
        traffic_matrix = te_manager.generate_traffic_matrix(connection_request=new_body)
×
UNCOV
328
    except Exception as request_err:
×
329
        logger.error("ERROR: invalid patch request: " + str(request_err))
×
330
        error_code = getattr(request_err, "request_code", None)
×
331
        if not isinstance(error_code, int):
×
332
            # Backward-compatible fallback for exception strings like "... (Code: 400)".
333
            error_code = 400
×
334
            err_text = str(request_err)
×
335
            if "Code:" in err_text:
×
336
                candidate = err_text.split("Code:")[-1].replace(")", "").strip()
×
UNCOV
337
                try:
×
UNCOV
338
                    error_code = int(candidate)
×
339
                except (TypeError, ValueError):
×
NEW
340
                    logger.warning(
×
341
                        f"Could not parse error code from patch validation error: {err_text}"
342
                    )
NEW
343
        return f"Error: patch request is not valid: {request_err}", error_code
×
NEW
344
    if traffic_matrix is None:
×
NEW
345
        return (
×
346
            "Error: patch request is not valid: "
347
            "Request does not have a valid JSON or body is incomplete/incorrect",
348
            400,
349
        )
350

UNCOV
351
    logger.info("Modifying connection")
×
352
    # Preserve the last successful request so rollback can recreate it cleanly.
353
    rollback_conn_body = copy.deepcopy(body)
×
354
    body.update(new_body)
×
355

UNCOV
356
    conn_status = ConnectionStateMachine.State.MODIFYING
×
UNCOV
357
    body, _ = connection_state_machine(body, conn_status)
×
UNCOV
358
    db_instance.update_field_in_json(
×
359
        MongoCollections.CONNECTIONS,
360
        service_id,
361
        "status",
362
        str(conn_status),
363
    )
364

UNCOV
365
    try:
×
UNCOV
366
        logger.info("Removing connection")
×
367
        remove_conn_reason, remove_conn_code = connection_handler.remove_connection(
×
368
            current_app.te_manager, service_id, "API"
369
        )
370

UNCOV
371
        if remove_conn_code // 100 != 2:
×
UNCOV
372
            conn_status = ConnectionStateMachine.State.DOWN
×
UNCOV
373
            body, _ = connection_state_machine(body, conn_status)
×
UNCOV
374
            db_instance.update_field_in_json(
×
375
                MongoCollections.CONNECTIONS,
376
                service_id,
377
                "status",
378
                str(conn_status),
379
            )
UNCOV
380
            response = {
×
381
                "service_id": service_id,
382
                "status": parse_conn_status(body["status"]),
383
                "reason": f"Failure to modify L2VPN during removal: {remove_conn_reason}",
384
            }
385
            return response, remove_conn_code
×
386

387
        logger.info(f"Removed connection: {service_id}")
×
388
    except Exception as e:
×
UNCOV
389
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
UNCOV
390
        conn_status = ConnectionStateMachine.State.DOWN
×
UNCOV
391
        body, _ = connection_state_machine(body, conn_status)
×
UNCOV
392
        db_instance.update_field_in_json(
×
393
            MongoCollections.CONNECTIONS,
394
            service_id,
395
            "status",
396
            str(conn_status),
397
        )
UNCOV
398
        return f"Failed, reason: {e}", 500
×
UNCOV
399
    time.sleep(10)
×
UNCOV
400
    logger.info(
×
401
        f"Modifying: Placing new connection {service_id} with te_manager: {current_app.te_manager}"
402
    )
403
    # Reset: remove_connection archives/deletes the original entry,
404
    # so persist the patched request before re-placement.
405
    conn_status = ConnectionStateMachine.State.REQUESTED
×
406
    body["status"] = str(conn_status)
×
UNCOV
407
    body["oxp_success_count"] = 0
×
408
    body["oxp_response"] = {}
×
NEW
409
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
×
NEW
410
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
×
411

NEW
412
    if code // 100 == 2:
×
NEW
413
        patched_conn = _wait_for_patch_provisioning_to_settle(service_id)
×
NEW
414
        patched_status = patched_conn.get("status") if patched_conn else None
×
NEW
415
        if patched_status == str(ConnectionStateMachine.State.UP):
×
NEW
416
            code = 201
×
NEW
417
            logger.info(f"Placed: ID: {service_id} reason='{reason}', code={code}")
×
NEW
418
            response = {
×
419
                "service_id": service_id,
420
                "status": parse_conn_status(patched_status),
421
                "reason": reason,
422
            }
NEW
423
            return response, code
×
424

NEW
425
        if patched_status == str(ConnectionStateMachine.State.UNDER_PROVISIONING):
×
NEW
426
            code = 201
×
NEW
427
            logger.info(
×
428
                f"Patch placement for {service_id} is still under provisioning; "
429
                "returning current state."
430
            )
NEW
431
            response = {
×
432
                "service_id": service_id,
433
                "status": parse_conn_status(patched_status),
434
                "reason": reason,
435
            }
436
            return response, code
×
437

UNCOV
438
        reason = f"Patched connection provisioning failed: {patched_status}"
×
UNCOV
439
        code = 400
×
440

UNCOV
441
    logger.info(
×
442
        f"Modifying: Failed to place new connection. ID: {service_id} reason='{reason}', code={code}"
443
    )
444
    logger.info("Rolling back to old connection.")
×
445

446
    # because above placement failed, so re-place the original connection request.
447

UNCOV
448
    rollback_conn_body["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
449
    # used in lc_message_handler to count the oxp success response
450
    rollback_conn_body["oxp_success_count"] = 0
×
NEW
451
    rollback_conn_body["oxp_response"] = {}
×
452

NEW
453
    conn_request = rollback_conn_body
×
NEW
454
    conn_request["id"] = service_id
×
NEW
455
    conn_request["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
NEW
456
    conn_request["oxp_success_count"] = 0
×
NEW
457
    conn_request["oxp_response"] = {}
×
NEW
458
    conn_request["late_cleanup_domains"] = []
×
UNCOV
459
    conn_request["partial_cleanup_requested"] = False
×
UNCOV
460
    conn_request["provisioning_timeout_handled"] = False
×
UNCOV
461
    conn_request["provisioning_started_at"] = time.time()
×
UNCOV
462
    conn_request.pop("timeout_reason", None)
×
463
    db_instance.add_key_value_pair_to_db(
×
464
        MongoCollections.CONNECTIONS, service_id, conn_request
465
    )
466

UNCOV
467
    rollback_conn_reason = "Rollback attempt did not complete"
×
468
    try:
×
NEW
469
        rollback_conn_reason, rollback_conn_code = connection_handler.place_connection(
×
470
            current_app.te_manager, conn_request
471
        )
NEW
472
        if rollback_conn_code // 100 == 2:
×
NEW
473
            rollback_conn = _wait_for_patch_provisioning_to_settle(service_id)
×
NEW
474
            rollback_status = rollback_conn.get("status") if rollback_conn else None
×
NEW
475
            if rollback_status == str(ConnectionStateMachine.State.UP):
×
476
                # Still return the patch failure code to indicate the requested
477
                # modification failed, even though rollback succeeded.
NEW
478
                rollback_conn_code = code
×
479
            else:
480
                rollback_conn_reason = (
×
481
                    f"Rollback provisioning did not recover service: {rollback_status}"
482
                )
483
                rollback_conn_code = 500
×
484
        else:
UNCOV
485
            conn_status = ConnectionStateMachine.State.REJECTED
×
UNCOV
486
            body, _ = connection_state_machine(body, conn_status)
×
UNCOV
487
            db_instance.update_field_in_json(
×
488
                MongoCollections.CONNECTIONS,
489
                service_id,
490
                "status",
491
                str(conn_status),
492
            )
NEW
493
            deadline = time.time() + ROLLBACK_SETTLE_TIMEOUT_SECONDS
×
NEW
494
            while time.time() < deadline:
×
NEW
495
                current_conn = db_instance.get_value_from_db(
×
496
                    MongoCollections.CONNECTIONS, service_id
497
                )
NEW
498
                current_status = current_conn.get("status") if current_conn else None
×
NEW
499
                if current_status != str(
×
500
                    ConnectionStateMachine.State.UNDER_PROVISIONING
501
                ):
UNCOV
502
                    break
×
503
                time.sleep(ROLLBACK_SETTLE_POLL_SECONDS)
×
504
        logger.info(
×
505
            f"Roll back connection result: ID: {service_id} reason='{rollback_conn_reason}', code={rollback_conn_code}"
506
        )
UNCOV
507
    except Exception as e:
×
UNCOV
508
        conn_status = ConnectionStateMachine.State.REJECTED
×
UNCOV
509
        db_instance.update_field_in_json(
×
510
            MongoCollections.CONNECTIONS,
511
            service_id,
512
            "status",
513
            str(conn_status),
514
        )
NEW
515
        logger.info(f"Rollback failed (connection id: {service_id}): {e}")
×
UNCOV
516
        rollback_conn_reason = f"Rollback failed: {e}"
×
UNCOV
517
        rollback_conn_code = 500
×
518

519
    response_code = code if rollback_conn_code // 100 == 2 else rollback_conn_code
×
UNCOV
520
    current_conn = db_instance.get_value_from_db(
×
521
        MongoCollections.CONNECTIONS, f"{service_id}"
522
    )
UNCOV
523
    response = {
×
524
        "service_id": service_id,
525
        "reason": f"Failure, rolled back to last successful L2VPN: {reason}",
526
        "status": parse_conn_status(
527
            current_conn.get("status", "") if current_conn else ""
528
        ),
529
    }
UNCOV
530
    return response, response_code
×
531

532

533
def get_archived_connections_by_id(service_id):
2✔
534
    """
535
    List archived connection by ID.
536

537
    :param service_id: ID of connection that needs to be fetched
538
    :type service_id: str
539

540
    :rtype: Connection
541
    """
542

543
    value = connection_handler.get_archived_connections(service_id)
2✔
544

545
    if not value:
2✔
546
        return "Archived connection not found", 404
2✔
547

548
    return {service_id: value}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc