• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 25140616787

30 Apr 2026 12:15AM UTC coverage: 50.831% (-2.0%) from 52.799%
25140616787

Pull #524

github

web-flow
Merge 35f1a4bf3 into 26d01cbb9
Pull Request #524: Rollback on failed OXP POST

103 of 322 new or added lines in 5 files covered. (31.99%)

14 existing lines in 4 files now uncovered.

1376 of 2707 relevant lines covered (50.83%)

1.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.12
/sdx_controller/controllers/l2vpn_controller.py
1
import copy
2✔
2
import logging
2✔
3
import os
2✔
4
import time
2✔
5
import uuid
2✔
6

7
import connexion
2✔
8
from flask import current_app
2✔
9
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
10
from sdx_datamodel.constants import MongoCollections
2✔
11

12
from sdx_controller.handlers.connection_handler import (
2✔
13
    ConnectionHandler,
14
    connection_state_machine,
15
    get_connection_status,
16
    parse_conn_status,
17
)
18

19
# from sdx_controller.models.l2vpn_service_id_body import L2vpnServiceIdBody  # noqa: E501
20
from sdx_controller.utils.db_utils import DbUtils
2✔
21

22
LOG_FORMAT = (
2✔
23
    "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
24
    "-35s %(lineno) -5d: %(message)s"
25
)
26
logger = logging.getLogger(__name__)
2✔
27
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
28
logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
29
ROLLBACK_SETTLE_TIMEOUT_SECONDS = float(
2✔
30
    os.getenv("ROLLBACK_SETTLE_TIMEOUT_SECONDS", "5")
31
)
32
ROLLBACK_SETTLE_POLL_SECONDS = float(os.getenv("ROLLBACK_SETTLE_POLL_SECONDS", "0.2"))
2✔
33

34
# Get DB connection and tables set up.
35
db_instance = DbUtils()
2✔
36
db_instance.initialize_db()
2✔
37
connection_handler = ConnectionHandler(db_instance)
2✔
38

39

40
def delete_connection(service_id):
2✔
41
    """
42
    Delete connection order by ID.
43

44
    :param service_id: ID of the connection that needs to be
45
        deleted
46
    :type service_id: str
47

48
    :rtype: None
49
    """
50
    logger.info(
2✔
51
        f"Handling delete (service id: {service_id}) "
52
        f"with te_manager: {current_app.te_manager}"
53
    )
54

55
    # # Looking up by UUID do not seem work yet.  Will address in
56
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/252.
57
    #
58
    # value = db_instance.read_from_db(f"{service_id}")
59
    # print(f"value: {value}")
60
    # if not value:
61
    #     return "Not found", 404
62

63
    try:
2✔
64
        # TODO: pce's unreserve_vlan() method silently returns even if the
65
        # service_id is not found.  This should in fact be an error.
66
        #
67
        # https://github.com/atlanticwave-sdx/pce/issues/180
68
        connection = db_instance.get_value_from_db(
2✔
69
            MongoCollections.CONNECTIONS, f"{service_id}"
70
        )
71

72
        if not connection:
2✔
73
            return "Did not find connection", 404
2✔
74

75
        logger.info(f"connection: {connection} {type(connection)}")
2✔
76
        logger.info(f"Removing connection: {service_id} {connection.get('status')}")
2✔
77

78
        remove_reason, remove_code = connection_handler.remove_connection(
2✔
79
            current_app.te_manager, service_id, "API"
80
        )
81
        if remove_code // 100 != 2:
2✔
UNCOV
82
            logger.info(
×
83
                f"Delete failed (connection id: {service_id}): "
84
                f"reason='{remove_reason}', code={remove_code}"
85
            )
86
            # return remove_reason, remove_code
87
        db_instance.mark_deleted(MongoCollections.CONNECTIONS, f"{service_id}")
2✔
88
        db_instance.mark_deleted(MongoCollections.BREAKDOWNS, f"{service_id}")
2✔
89
    except Exception as e:
×
90
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
91
        return f"Failed, reason: {e}", 500
×
92

93
    return "OK", 200
2✔
94

95

96
def get_connection_by_id(service_id):
2✔
97
    """
98
    Find connection by ID.
99

100
    :param service_id: ID of connection that needs to be fetched
101
    :type service_id: str
102

103
    :rtype: Connection
104
    """
105

106
    value = get_connection_status(db_instance, service_id)
2✔
107

108
    if not value:
2✔
109
        return "Connection not found", 404
2✔
110

111
    return value
2✔
112

113

114
def get_connections():  # noqa: E501
2✔
115
    """
116
    List all connections
117

118
    connection details # noqa: E501
119

120
    :rtype: Connection
121
    """
122
    values = db_instance.get_all_entries_in_collection(MongoCollections.CONNECTIONS)
2✔
123
    if not values:
2✔
124
        return "No connection was found", 404
2✔
125
    return_values = {}
2✔
126
    for connection in values:
2✔
127
        service_id = next(iter(connection))
2✔
128
        logger.info(f"service_id: {service_id}")
2✔
129
        connection_status = get_connection_status(db_instance, service_id)
2✔
130
        if connection_status:
2✔
131
            return_values[service_id] = connection_status.get(service_id)
2✔
132
    return return_values
2✔
133

134

135
def get_archived_connections():
2✔
136
    """
137
    List all archived connections.
138

139
    :rtype: dict
140
    """
141
    values = db_instance.get_all_entries_in_collection(
2✔
142
        MongoCollections.HISTORICAL_CONNECTIONS
143
    )
144
    if not values:
2✔
145
        return "No archived connection was found", 404
2✔
146

147
    return_values = {}
2✔
148
    for archived_connection in values:
2✔
149
        service_id = next(iter(archived_connection))
2✔
150
        archived_events = connection_handler.get_archived_connections(service_id)
2✔
151
        if archived_events:
2✔
152
            return_values[service_id] = archived_events
2✔
153

154
    if not return_values:
2✔
155
        return "No archived connection was found", 404
×
156
    return return_values
2✔
157

158

159
def place_connection(body):
2✔
160
    """
161
    Place an connection request from the SDX-Controller.
162

163
    :param body: order placed for creating a connection
164
    :type body: dict | bytes
165

166
    :rtype: Connection
167
    """
168
    logger.info(f"Placing connection: {body}")
2✔
169
    if not connexion.request.is_json:
2✔
170
        return "Request body must be JSON", 400
×
171

172
    body = connexion.request.get_json()
2✔
173
    logger.info(f"Gathered connexion JSON: {body}")
2✔
174

175
    logger.info("Placing connection. Saving to database.")
2✔
176

177
    service_id = body.get("id")
2✔
178

179
    if service_id is None:
2✔
180
        service_id = str(uuid.uuid4())
2✔
181
        body["id"] = service_id
2✔
182
        logger.info(f"Request has no ID. Generated ID: {service_id}")
2✔
183

184
    conn_status = ConnectionStateMachine.State.REQUESTED
2✔
185
    body["status"] = str(conn_status)
2✔
186

187
    # used in lc_message_handler to count the oxp success response
188
    body["oxp_success_count"] = 0
2✔
189
    body["partial_cleanup_requested"] = False
2✔
190
    body["provisioning_timeout_handled"] = False
2✔
191
    body["provisioning_started_at"] = time.time()
2✔
192
    body.pop("timeout_reason", None)
2✔
193

194
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
2✔
195

196
    logger.info(
2✔
197
        f"Handling request {service_id} with te_manager: {current_app.te_manager}"
198
    )
199
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
2✔
200

201
    if code // 100 == 2:
2✔
202
        # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
203
        # body, _ = connection_state_machine(body, conn_status)
204
        # db_instance.update_field_in_json(
205
        #    MongoCollections.CONNECTIONS,
206
        #    service_id,
207
        #    "status",
208
        #    str(conn_status),
209
        # )
210
        logger.info(f"place_connection succeeds: ID: {service_id} body='{body}'")
2✔
211
    else:
212
        conn_status = ConnectionStateMachine.State.REJECTED
2✔
213
        body, _ = connection_state_machine(body, conn_status)
2✔
214
        db_instance.update_field_in_json(
2✔
215
            MongoCollections.CONNECTIONS,
216
            service_id,
217
            "status",
218
            str(conn_status),
219
        )
220
    logger.info(
2✔
221
        f"place_connection result: ID: {service_id} reason='{reason}', code={code}"
222
    )
223

224
    current_conn = db_instance.get_value_from_db(
2✔
225
        MongoCollections.CONNECTIONS, f"{service_id}"
226
    )
227
    response = {
2✔
228
        "service_id": service_id,
229
        "status": parse_conn_status(
230
            current_conn.get("status", str(conn_status))
231
            if current_conn
232
            else str(conn_status)
233
        ),
234
        "reason": reason,
235
    }
236

237
    # # TODO: our response is supposed to be shaped just like request
238
    # # ('#/components/schemas/connection'), and in that case the below
239
    # # code would be a quick implementation.
240
    # #
241
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/251
242
    # response = body
243

244
    # response["id"] = service_id
245
    # response["status"] = "success" if code == 2xx else "failure"
246
    # response["reason"] = reason # `reason` is not present in schema though.
247

248
    return response, code
2✔
249

250

251
def patch_connection(service_id, body=None):  # noqa: E501
2✔
252
    """Edit and change an existing L2vpn connection by ID from the SDX-Controller
253

254
     # noqa: E501
255

256
    :param service_id: ID of l2vpn connection that needs to be changed
257
    :type service_id: dict | bytes'
258
    :param body:
259
    :type body: dict | bytes
260

261
    :rtype: Connection
262
    """
263
    body = db_instance.get_value_from_db(MongoCollections.CONNECTIONS, f"{service_id}")
×
264
    if not body:
×
265
        return "Connection not found", 404
×
266

267
    if not connexion.request.is_json:
×
268
        return "Request body must be JSON", 400
×
269

270
    new_body = connexion.request.get_json()
×
271

272
    logger.info(f"Gathered connexion JSON: {new_body}")
×
273

274
    if "id" not in new_body:
×
275
        new_body["id"] = service_id
×
276

277
    # Validate the new request body before making any change to the existing connection.
278
    # This is to avoid the case where we have already removed the original connection but the new request body is invalid, which will cause the connection to be deleted but not re-created.
279
    # We can reuse the same validation function used in place_connection since the request body for patch_connection has the same schema as place_connection.
280
    #
281
    te_manager = current_app.te_manager  # Assuming te_manager is accessible like this
×
282
    try:
×
283
        # Validate the new request body
NEW
284
        traffic_matrix = te_manager.generate_traffic_matrix(connection_request=new_body)
×
285
    except Exception as request_err:
×
286
        logger.error("ERROR: invalid patch request: " + str(request_err))
×
287
        error_code = getattr(request_err, "request_code", None)
×
288
        if not isinstance(error_code, int):
×
289
            # Backward-compatible fallback for exception strings like "... (Code: 400)".
290
            error_code = 400
×
291
            err_text = str(request_err)
×
292
            if "Code:" in err_text:
×
293
                candidate = err_text.split("Code:")[-1].replace(")", "").strip()
×
294
                try:
×
295
                    error_code = int(candidate)
×
296
                except (TypeError, ValueError):
×
297
                    logger.warning(
×
298
                        f"Could not parse error code from patch validation error: {err_text}"
299
                    )
300
        return f"Error: patch request is not valid: {request_err}", error_code
×
NEW
301
    if traffic_matrix is None:
×
NEW
302
        return (
×
303
            "Error: patch request is not valid: "
304
            "Request does not have a valid JSON or body is incomplete/incorrect",
305
            400,
306
        )
307

308
    logger.info("Modifying connection")
×
309
    # Preserve the last successful request so rollback can recreate it cleanly.
310
    rollback_conn_body = copy.deepcopy(body)
×
311
    body.update(new_body)
×
312

313
    conn_status = ConnectionStateMachine.State.MODIFYING
×
314
    body, _ = connection_state_machine(body, conn_status)
×
315
    db_instance.update_field_in_json(
×
316
        MongoCollections.CONNECTIONS,
317
        service_id,
318
        "status",
319
        str(conn_status),
320
    )
321

322
    try:
×
323
        logger.info("Removing connection")
×
324
        remove_conn_reason, remove_conn_code = connection_handler.remove_connection(
×
325
            current_app.te_manager, service_id, "API"
326
        )
327

328
        if remove_conn_code // 100 != 2:
×
329
            conn_status = ConnectionStateMachine.State.DOWN
×
330
            body, _ = connection_state_machine(body, conn_status)
×
331
            db_instance.update_field_in_json(
×
332
                MongoCollections.CONNECTIONS,
333
                service_id,
334
                "status",
335
                str(conn_status),
336
            )
337
            response = {
×
338
                "service_id": service_id,
339
                "status": parse_conn_status(body["status"]),
340
                "reason": f"Failure to modify L2VPN during removal: {remove_conn_reason}",
341
            }
342
            return response, remove_conn_code
×
343

344
        logger.info(f"Removed connection: {service_id}")
×
345
    except Exception as e:
×
346
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
347
        conn_status = ConnectionStateMachine.State.DOWN
×
348
        body, _ = connection_state_machine(body, conn_status)
×
349
        db_instance.update_field_in_json(
×
350
            MongoCollections.CONNECTIONS,
351
            service_id,
352
            "status",
353
            str(conn_status),
354
        )
355
        return f"Failed, reason: {e}", 500
×
356
    time.sleep(10)
×
357
    logger.info(
×
358
        f"Modifying: Placing new connection {service_id} with te_manager: {current_app.te_manager}"
359
    )
360
    # Reset: remove_connection archives/deletes the original entry,
361
    # so persist the patched request before re-placement.
362
    conn_status = ConnectionStateMachine.State.REQUESTED
×
363
    body["status"] = str(conn_status)
×
364
    body["oxp_success_count"] = 0
×
365
    body["oxp_response"] = {}
×
366
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
×
367
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
×
368

369
    if code // 100 == 2:
×
370
        # Service created successfully
371
        # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
372
        # body, _ = connection_state_machine(body, conn_status)
373
        # db_instance.add_key_value_pair_to_db(
374
        #    MongoCollections.CONNECTIONS, service_id, body
375
        # )
376
        code = 201
×
377
        logger.info(f"Placed: ID: {service_id} reason='{reason}', code={code}")
×
378
        response = {
×
379
            "service_id": service_id,
380
            "status": parse_conn_status(body["status"]),
381
            "reason": reason,
382
        }
383
        return response, code
×
384

385
    logger.info(
×
386
        f"Modifying: Failed to place new connection. ID: {service_id} reason='{reason}', code={code}"
387
    )
388
    logger.info("Rolling back to old connection.")
×
389

390
    # because above placement failed, so re-place the original connection request.
391

392
    rollback_conn_body["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
393
    # used in lc_message_handler to count the oxp success response
394
    rollback_conn_body["oxp_success_count"] = 0
×
395
    rollback_conn_body["oxp_response"] = {}
×
396

397
    conn_request = rollback_conn_body
×
398
    conn_request["id"] = service_id
×
NEW
399
    conn_request["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
NEW
400
    conn_request["oxp_success_count"] = 0
×
NEW
401
    conn_request["oxp_response"] = {}
×
NEW
402
    conn_request["late_cleanup_domains"] = []
×
NEW
403
    conn_request["partial_cleanup_requested"] = False
×
NEW
404
    conn_request["provisioning_timeout_handled"] = False
×
NEW
405
    conn_request["provisioning_started_at"] = time.time()
×
NEW
406
    conn_request.pop("timeout_reason", None)
×
UNCOV
407
    db_instance.add_key_value_pair_to_db(
×
408
        MongoCollections.CONNECTIONS, service_id, conn_request
409
    )
410

411
    rollback_conn_reason = "Rollback attempt did not complete"
×
412
    try:
×
413
        rollback_conn_reason, rollback_conn_code = connection_handler.place_connection(
×
414
            current_app.te_manager, conn_request
415
        )
416
        if rollback_conn_code // 100 == 2:
×
417
            # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
418
            # rollback_conn_body, _ = connection_state_machine(
419
            #    rollback_conn_body, conn_status
420
            # )
421
            # db_instance.update_field_in_json(
422
            #    MongoCollections.CONNECTIONS,
423
            #    service_id,
424
            #    "status",
425
            #    str(conn_status),
426
            # )
427
            # still return 400 to indicate the patch request is not successful, since we have already rolled back to original connection, which is under provisioning state, so the connection is not down and not failed.
428
            rollback_conn_code = code
×
429
        else:
430
            conn_status = ConnectionStateMachine.State.REJECTED
×
431
            body, _ = connection_state_machine(body, conn_status)
×
432
            db_instance.update_field_in_json(
×
433
                MongoCollections.CONNECTIONS,
434
                service_id,
435
                "status",
436
                str(conn_status),
437
            )
NEW
438
            deadline = time.time() + ROLLBACK_SETTLE_TIMEOUT_SECONDS
×
NEW
439
            while time.time() < deadline:
×
NEW
440
                current_conn = db_instance.get_value_from_db(
×
441
                    MongoCollections.CONNECTIONS, service_id
442
                )
NEW
443
                current_status = current_conn.get("status") if current_conn else None
×
NEW
444
                if current_status != str(
×
445
                    ConnectionStateMachine.State.UNDER_PROVISIONING
446
                ):
NEW
447
                    break
×
NEW
448
                time.sleep(ROLLBACK_SETTLE_POLL_SECONDS)
×
UNCOV
449
        logger.info(
×
450
            f"Roll back connection result: ID: {service_id} reason='{rollback_conn_reason}', code={rollback_conn_code}"
451
        )
452
    except Exception as e:
×
453
        conn_status = ConnectionStateMachine.State.REJECTED
×
454
        db_instance.update_field_in_json(
×
455
            MongoCollections.CONNECTIONS,
456
            service_id,
457
            "status",
458
            str(conn_status),
459
        )
460
        logger.info(f"Rollback failed (connection id: {service_id}): {e}")
×
461
        rollback_conn_reason = f"Rollback failed: {e}"
×
462
        rollback_conn_code = 500
×
463

NEW
464
    response_code = code if rollback_conn_code // 100 == 2 else rollback_conn_code
×
UNCOV
465
    current_conn = db_instance.get_value_from_db(
×
466
        MongoCollections.CONNECTIONS, f"{service_id}"
467
    )
468
    response = {
×
469
        "service_id": service_id,
470
        "reason": f"Failure, rolled back to last successful L2VPN: {reason}",
471
        "status": parse_conn_status(
472
            current_conn.get("status", "") if current_conn else ""
473
        ),
474
    }
NEW
475
    return response, response_code
×
476

477

478
def get_archived_connections_by_id(service_id):
2✔
479
    """
480
    List archived connection by ID.
481

482
    :param service_id: ID of connection that needs to be fetched
483
    :type service_id: str
484

485
    :rtype: Connection
486
    """
487

488
    value = connection_handler.get_archived_connections(service_id)
2✔
489

490
    if not value:
2✔
491
        return "Archived connection not found", 404
2✔
492

493
    return {service_id: value}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc