• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 24897435355

24 Apr 2026 03:25PM UTC coverage: 52.799% (-1.2%) from 53.992%
24897435355

push

github

web-flow
Merge pull request #523 from atlanticwave-sdx/522-patch-return-code

return 400 for invalid patch request.
RabbitMQ new release 4.3.0

29 of 106 new or added lines in 4 files covered. (27.36%)

24 existing lines in 4 files now uncovered.

1292 of 2447 relevant lines covered (52.8%)

1.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.48
/sdx_controller/controllers/l2vpn_controller.py
1
import copy
2✔
2
import logging
2✔
3
import os
2✔
4
import time
2✔
5
import uuid
2✔
6

7
import connexion
2✔
8
from flask import current_app
2✔
9
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
10
from sdx_datamodel.constants import MongoCollections
2✔
11

12
from sdx_controller.handlers.connection_handler import (
2✔
13
    ConnectionHandler,
14
    connection_state_machine,
15
    get_connection_status,
16
    parse_conn_status,
17
)
18

19
# from sdx_controller.models.l2vpn_service_id_body import L2vpnServiceIdBody  # noqa: E501
20
from sdx_controller.utils.db_utils import DbUtils
2✔
21

22
LOG_FORMAT = (
2✔
23
    "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
24
    "-35s %(lineno) -5d: %(message)s"
25
)
26
logger = logging.getLogger(__name__)
2✔
27
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
28
logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
29

30
# Get DB connection and tables set up.
31
db_instance = DbUtils()
2✔
32
db_instance.initialize_db()
2✔
33
connection_handler = ConnectionHandler(db_instance)
2✔
34

35

36
def delete_connection(service_id):
2✔
37
    """
38
    Delete connection order by ID.
39

40
    :param service_id: ID of the connection that needs to be
41
        deleted
42
    :type service_id: str
43

44
    :rtype: None
45
    """
46
    logger.info(
2✔
47
        f"Handling delete (service id: {service_id}) "
48
        f"with te_manager: {current_app.te_manager}"
49
    )
50

51
    # # Looking up by UUID do not seem work yet.  Will address in
52
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/252.
53
    #
54
    # value = db_instance.read_from_db(f"{service_id}")
55
    # print(f"value: {value}")
56
    # if not value:
57
    #     return "Not found", 404
58

59
    try:
2✔
60
        # TODO: pce's unreserve_vlan() method silently returns even if the
61
        # service_id is not found.  This should in fact be an error.
62
        #
63
        # https://github.com/atlanticwave-sdx/pce/issues/180
64
        connection = db_instance.get_value_from_db(
2✔
65
            MongoCollections.CONNECTIONS, f"{service_id}"
66
        )
67

68
        if not connection:
2✔
69
            return "Did not find connection", 404
2✔
70

71
        logger.info(f"connection: {connection} {type(connection)}")
2✔
72
        if connection.get("status") is None:
2✔
73
            logger.error("Missing field: status is not in connection.")
×
74
            connection["status"] = str(ConnectionStateMachine.State.DELETED)
×
75
        elif connection["status"] == str(ConnectionStateMachine.State.UP):
2✔
76
            connection, _ = connection_state_machine(
2✔
77
                connection, ConnectionStateMachine.State.DELETED
78
            )
79
        elif connection["status"] == str(
×
80
            ConnectionStateMachine.State.UNDER_PROVISIONING
81
        ):
82
            connection, _ = connection_state_machine(
×
83
                connection, ConnectionStateMachine.State.DOWN
84
            )
85
            connection, _ = connection_state_machine(
×
86
                connection, ConnectionStateMachine.State.DELETED
87
            )
88
        else:
89
            connection, _ = connection_state_machine(
×
90
                connection, ConnectionStateMachine.State.DELETED
91
            )
92

93
        logger.info(f"Removing connection: {service_id} {connection.get('status')}")
2✔
94

95
        remove_reason, remove_code = connection_handler.remove_connection(
2✔
96
            current_app.te_manager, service_id, "API"
97
        )
98
        if remove_code // 100 != 2:
2✔
99
            logger.info(
2✔
100
                f"Delete failed (connection id: {service_id}): "
101
                f"reason='{remove_reason}', code={remove_code}"
102
            )
103
            # return remove_reason, remove_code
104
        db_instance.mark_deleted(MongoCollections.CONNECTIONS, f"{service_id}")
2✔
105
        db_instance.mark_deleted(MongoCollections.BREAKDOWNS, f"{service_id}")
2✔
106
    except Exception as e:
×
107
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
108
        return f"Failed, reason: {e}", 500
×
109

110
    return "OK", 200
2✔
111

112

113
def get_connection_by_id(service_id):
2✔
114
    """
115
    Find connection by ID.
116

117
    :param service_id: ID of connection that needs to be fetched
118
    :type service_id: str
119

120
    :rtype: Connection
121
    """
122

123
    value = get_connection_status(db_instance, service_id)
2✔
124

125
    if not value:
2✔
126
        return "Connection not found", 404
2✔
127

128
    return value
2✔
129

130

131
def get_connections():  # noqa: E501
2✔
132
    """
133
    List all connections
134

135
    connection details # noqa: E501
136

137
    :rtype: Connection
138
    """
139
    values = db_instance.get_all_entries_in_collection(MongoCollections.CONNECTIONS)
2✔
140
    if not values:
2✔
141
        return "No connection was found", 404
2✔
142
    return_values = {}
2✔
143
    for connection in values:
2✔
144
        service_id = next(iter(connection))
2✔
145
        logger.info(f"service_id: {service_id}")
2✔
146
        connection_status = get_connection_status(db_instance, service_id)
2✔
147
        if connection_status:
2✔
148
            return_values[service_id] = connection_status.get(service_id)
2✔
149
    return return_values
2✔
150

151

152
def get_archived_connections():
2✔
153
    """
154
    List all archived connections.
155

156
    :rtype: dict
157
    """
158
    values = db_instance.get_all_entries_in_collection(
2✔
159
        MongoCollections.HISTORICAL_CONNECTIONS
160
    )
161
    if not values:
2✔
162
        return "No archived connection was found", 404
2✔
163

164
    return_values = {}
2✔
165
    for archived_connection in values:
2✔
166
        service_id = next(iter(archived_connection))
2✔
167
        archived_events = connection_handler.get_archived_connections(service_id)
2✔
168
        if archived_events:
2✔
169
            return_values[service_id] = archived_events
2✔
170

171
    if not return_values:
2✔
172
        return "No archived connection was found", 404
×
173
    return return_values
2✔
174

175

176
def place_connection(body):
2✔
177
    """
178
    Place an connection request from the SDX-Controller.
179

180
    :param body: order placed for creating a connection
181
    :type body: dict | bytes
182

183
    :rtype: Connection
184
    """
185
    logger.info(f"Placing connection: {body}")
2✔
186
    if not connexion.request.is_json:
2✔
187
        return "Request body must be JSON", 400
×
188

189
    body = connexion.request.get_json()
2✔
190
    logger.info(f"Gathered connexion JSON: {body}")
2✔
191

192
    logger.info("Placing connection. Saving to database.")
2✔
193

194
    service_id = body.get("id")
2✔
195

196
    if service_id is None:
2✔
197
        service_id = str(uuid.uuid4())
2✔
198
        body["id"] = service_id
2✔
199
        logger.info(f"Request has no ID. Generated ID: {service_id}")
2✔
200

201
    conn_status = ConnectionStateMachine.State.REQUESTED
2✔
202
    body["status"] = str(conn_status)
2✔
203

204
    # used in lc_message_handler to count the oxp success response
205
    body["oxp_success_count"] = 0
2✔
206

207
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
2✔
208

209
    logger.info(
2✔
210
        f"Handling request {service_id} with te_manager: {current_app.te_manager}"
211
    )
212
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
2✔
213

214
    if code // 100 == 2:
2✔
215
        # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
216
        # body, _ = connection_state_machine(body, conn_status)
217
        # db_instance.update_field_in_json(
218
        #    MongoCollections.CONNECTIONS,
219
        #    service_id,
220
        #    "status",
221
        #    str(conn_status),
222
        # )
223
        logger.info(f"place_connection succeeds: ID: {service_id} body='{body}'")
2✔
224
    else:
225
        conn_status = ConnectionStateMachine.State.REJECTED
2✔
226
        body, _ = connection_state_machine(body, conn_status)
2✔
227
        db_instance.update_field_in_json(
2✔
228
            MongoCollections.CONNECTIONS,
229
            service_id,
230
            "status",
231
            str(conn_status),
232
        )
233
    logger.info(
2✔
234
        f"place_connection result: ID: {service_id} reason='{reason}', code={code}"
235
    )
236

237
    current_conn = db_instance.get_value_from_db(
2✔
238
        MongoCollections.CONNECTIONS, f"{service_id}"
239
    )
240
    response = {
2✔
241
        "service_id": service_id,
242
        "status": parse_conn_status(
243
            current_conn.get("status", str(conn_status))
244
            if current_conn
245
            else str(conn_status)
246
        ),
247
        "reason": reason,
248
    }
249

250
    # # TODO: our response is supposed to be shaped just like request
251
    # # ('#/components/schemas/connection'), and in that case the below
252
    # # code would be a quick implementation.
253
    # #
254
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/251
255
    # response = body
256

257
    # response["id"] = service_id
258
    # response["status"] = "success" if code == 2xx else "failure"
259
    # response["reason"] = reason # `reason` is not present in schema though.
260

261
    return response, code
2✔
262

263

264
def patch_connection(service_id, body=None):  # noqa: E501
2✔
265
    """Edit and change an existing L2vpn connection by ID from the SDX-Controller
266

267
     # noqa: E501
268

269
    :param service_id: ID of l2vpn connection that needs to be changed
270
    :type service_id: dict | bytes'
271
    :param body:
272
    :type body: dict | bytes
273

274
    :rtype: Connection
275
    """
276
    body = db_instance.get_value_from_db(MongoCollections.CONNECTIONS, f"{service_id}")
×
277
    if not body:
×
278
        return "Connection not found", 404
×
279

280
    if not connexion.request.is_json:
×
281
        return "Request body must be JSON", 400
×
282

283
    new_body = connexion.request.get_json()
×
284

285
    logger.info(f"Gathered connexion JSON: {new_body}")
×
286

NEW
287
    if "id" not in new_body:
×
NEW
288
        new_body["id"] = service_id
×
289

290
    # Validate the new request body before making any change to the existing connection.
291
    # This is to avoid the case where we have already removed the original connection but the new request body is invalid, which will cause the connection to be deleted but not re-created.
292
    # We can reuse the same validation function used in place_connection since the request body for patch_connection has the same schema as place_connection.
293
    #
NEW
294
    te_manager = current_app.te_manager  # Assuming te_manager is accessible like this
×
NEW
295
    try:
×
296
        # Validate the new request body
NEW
297
        te_manager.generate_traffic_matrix(connection_request=new_body)
×
NEW
298
    except Exception as request_err:
×
NEW
299
        logger.error("ERROR: invalid patch request: " + str(request_err))
×
NEW
300
        error_code = getattr(request_err, "request_code", None)
×
NEW
301
        if not isinstance(error_code, int):
×
302
            # Backward-compatible fallback for exception strings like "... (Code: 400)".
NEW
303
            error_code = 400
×
NEW
304
            err_text = str(request_err)
×
NEW
305
            if "Code:" in err_text:
×
NEW
306
                candidate = err_text.split("Code:")[-1].replace(")", "").strip()
×
NEW
307
                try:
×
NEW
308
                    error_code = int(candidate)
×
NEW
309
                except (TypeError, ValueError):
×
NEW
310
                    logger.warning(
×
311
                        f"Could not parse error code from patch validation error: {err_text}"
312
                    )
NEW
313
        return f"Error: patch request is not valid: {request_err}", error_code
×
314

NEW
315
    logger.info("Modifying connection")
×
316
    # Get roll back connection before removing connection
NEW
317
    rollback_conn_body = copy.deepcopy(body)
×
NEW
318
    body.update(new_body)
×
319

NEW
320
    conn_status = ConnectionStateMachine.State.MODIFYING
×
NEW
321
    body, _ = connection_state_machine(body, conn_status)
×
NEW
322
    db_instance.update_field_in_json(
×
323
        MongoCollections.CONNECTIONS,
324
        service_id,
325
        "status",
326
        str(conn_status),
327
    )
328

329
    try:
×
330
        logger.info("Removing connection")
×
UNCOV
331
        remove_conn_reason, remove_conn_code = connection_handler.remove_connection(
×
332
            current_app.te_manager, service_id, "API"
333
        )
334

335
        if remove_conn_code // 100 != 2:
×
NEW
336
            conn_status = ConnectionStateMachine.State.DOWN
×
NEW
337
            body, _ = connection_state_machine(body, conn_status)
×
NEW
338
            db_instance.update_field_in_json(
×
339
                MongoCollections.CONNECTIONS,
340
                service_id,
341
                "status",
342
                str(conn_status),
343
            )
344
            response = {
×
345
                "service_id": service_id,
346
                "status": parse_conn_status(body["status"]),
347
                "reason": f"Failure to modify L2VPN during removal: {remove_conn_reason}",
348
            }
349
            return response, remove_conn_code
×
350

351
        logger.info(f"Removed connection: {service_id}")
×
352
    except Exception as e:
×
353
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
NEW
354
        conn_status = ConnectionStateMachine.State.DOWN
×
NEW
355
        body, _ = connection_state_machine(body, conn_status)
×
NEW
356
        db_instance.update_field_in_json(
×
357
            MongoCollections.CONNECTIONS,
358
            service_id,
359
            "status",
360
            str(conn_status),
361
        )
UNCOV
362
        return f"Failed, reason: {e}", 500
×
NEW
363
    time.sleep(10)
×
364
    logger.info(
×
365
        f"Modifying: Placing new connection {service_id} with te_manager: {current_app.te_manager}"
366
    )
367
    # Reset: remove_connection archives/deletes the original entry,
368
    # so persist the patched request before re-placement.
NEW
369
    conn_status = ConnectionStateMachine.State.REQUESTED
×
NEW
370
    body["status"] = str(conn_status)
×
NEW
371
    body["oxp_success_count"] = 0
×
NEW
372
    body["oxp_response"] = {}
×
373
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
×
374
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
×
375

376
    if code // 100 == 2:
×
377
        # Service created successfully
378
        # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
379
        # body, _ = connection_state_machine(body, conn_status)
380
        # db_instance.add_key_value_pair_to_db(
381
        #    MongoCollections.CONNECTIONS, service_id, body
382
        # )
383
        code = 201
×
384
        logger.info(f"Placed: ID: {service_id} reason='{reason}', code={code}")
×
385
        response = {
×
386
            "service_id": service_id,
387
            "status": parse_conn_status(body["status"]),
388
            "reason": reason,
389
        }
390
        return response, code
×
391

392
    logger.info(
×
393
        f"Modifying: Failed to place new connection. ID: {service_id} reason='{reason}', code={code}"
394
    )
395
    logger.info("Rolling back to old connection.")
×
396

397
    # because above placement failed, so re-place the original connection request.
398

NEW
399
    rollback_conn_body["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
400
    # used in lc_message_handler to count the oxp success response
NEW
401
    rollback_conn_body["oxp_success_count"] = 0
×
NEW
402
    rollback_conn_body["oxp_response"] = {}
×
403

404
    conn_request = rollback_conn_body
×
405
    conn_request["id"] = service_id
×
NEW
406
    db_instance.add_key_value_pair_to_db(
×
407
        MongoCollections.CONNECTIONS, service_id, conn_request
408
    )
409

NEW
410
    rollback_conn_reason = "Rollback attempt did not complete"
×
411
    try:
×
412
        rollback_conn_reason, rollback_conn_code = connection_handler.place_connection(
×
413
            current_app.te_manager, conn_request
414
        )
415
        if rollback_conn_code // 100 == 2:
×
416
            # conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
417
            # rollback_conn_body, _ = connection_state_machine(
418
            #    rollback_conn_body, conn_status
419
            # )
420
            # db_instance.update_field_in_json(
421
            #    MongoCollections.CONNECTIONS,
422
            #    service_id,
423
            #    "status",
424
            #    str(conn_status),
425
            # )
426
            # still return 400 to indicate the patch request is not successful, since we have already rolled back to original connection, which is under provisioning state, so the connection is not down and not failed.
NEW
427
            rollback_conn_code = code
×
428
        else:
NEW
429
            conn_status = ConnectionStateMachine.State.REJECTED
×
NEW
430
            body, _ = connection_state_machine(body, conn_status)
×
NEW
431
            db_instance.update_field_in_json(
×
432
                MongoCollections.CONNECTIONS,
433
                service_id,
434
                "status",
435
                str(conn_status),
436
            )
NEW
437
            rollback_conn_code = 500
×
UNCOV
438
        logger.info(
×
439
            f"Roll back connection result: ID: {service_id} reason='{rollback_conn_reason}', code={rollback_conn_code}"
440
        )
441
    except Exception as e:
×
NEW
442
        conn_status = ConnectionStateMachine.State.REJECTED
×
NEW
443
        db_instance.update_field_in_json(
×
444
            MongoCollections.CONNECTIONS,
445
            service_id,
446
            "status",
447
            str(conn_status),
448
        )
449
        logger.info(f"Rollback failed (connection id: {service_id}): {e}")
×
NEW
450
        rollback_conn_reason = f"Rollback failed: {e}"
×
NEW
451
        rollback_conn_code = 500
×
452

NEW
453
    current_conn = db_instance.get_value_from_db(
×
454
        MongoCollections.CONNECTIONS, f"{service_id}"
455
    )
UNCOV
456
    response = {
×
457
        "service_id": service_id,
458
        "reason": f"Failure, rolled back to last successful L2VPN: {reason}",
459
        "status": parse_conn_status(
460
            current_conn.get("status", "") if current_conn else ""
461
        ),
462
    }
NEW
463
    return response, rollback_conn_code
×
464

465

466
def get_archived_connections_by_id(service_id):
2✔
467
    """
468
    List archived connection by ID.
469

470
    :param service_id: ID of connection that needs to be fetched
471
    :type service_id: str
472

473
    :rtype: Connection
474
    """
475

476
    value = connection_handler.get_archived_connections(service_id)
2✔
477

478
    if not value:
2✔
479
        return "Archived connection not found", 404
2✔
480

481
    return {service_id: value}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc