• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 24754368375

22 Apr 2026 12:53AM UTC coverage: 51.229% (-2.8%) from 53.992%
24754368375

Pull #524

github

web-flow
Merge dcd02c04f into 09bd011bd
Pull Request #524: Rollback on failed OXP POST

90 of 305 new or added lines in 5 files covered. (29.51%)

14 existing lines in 3 files now uncovered.

1355 of 2645 relevant lines covered (51.23%)

1.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.04
/sdx_controller/controllers/l2vpn_controller.py
1
import copy
2✔
2
import logging
2✔
3
import os
2✔
4
import time
2✔
5
import uuid
2✔
6

7
import connexion
2✔
8
from flask import current_app
2✔
9
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
10
from sdx_datamodel.constants import MongoCollections
2✔
11

12
from sdx_controller.handlers.connection_handler import (
2✔
13
    ConnectionHandler,
14
    connection_state_machine,
15
    get_connection_status,
16
    parse_conn_status,
17
)
18

19
# from sdx_controller.models.l2vpn_service_id_body import L2vpnServiceIdBody  # noqa: E501
20
from sdx_controller.utils.db_utils import DbUtils
2✔
21

22
LOG_FORMAT = (
2✔
23
    "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
24
    "-35s %(lineno) -5d: %(message)s"
25
)
26
logger = logging.getLogger(__name__)
2✔
27
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
28
logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
29
ROLLBACK_SETTLE_TIMEOUT_SECONDS = float(
2✔
30
    os.getenv("ROLLBACK_SETTLE_TIMEOUT_SECONDS", "5")
31
)
32
ROLLBACK_SETTLE_POLL_SECONDS = float(os.getenv("ROLLBACK_SETTLE_POLL_SECONDS", "0.2"))
2✔
33

34
# Get DB connection and tables set up.
35
db_instance = DbUtils()
2✔
36
db_instance.initialize_db()
2✔
37
connection_handler = ConnectionHandler(db_instance)
2✔
38

39

40
def delete_connection(service_id):
2✔
41
    """
42
    Delete connection order by ID.
43

44
    :param service_id: ID of the connection that needs to be
45
        deleted
46
    :type service_id: str
47

48
    :rtype: None
49
    """
50
    logger.info(
2✔
51
        f"Handling delete (service id: {service_id}) "
52
        f"with te_manager: {current_app.te_manager}"
53
    )
54

55
    # # Looking up by UUID do not seem work yet.  Will address in
56
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/252.
57
    #
58
    # value = db_instance.read_from_db(f"{service_id}")
59
    # print(f"value: {value}")
60
    # if not value:
61
    #     return "Not found", 404
62

63
    try:
2✔
64
        # TODO: pce's unreserve_vlan() method silently returns even if the
65
        # service_id is not found.  This should in fact be an error.
66
        #
67
        # https://github.com/atlanticwave-sdx/pce/issues/180
68
        connection = db_instance.get_value_from_db(
2✔
69
            MongoCollections.CONNECTIONS, f"{service_id}"
70
        )
71

72
        if not connection:
2✔
73
            return "Did not find connection", 404
2✔
74

75
        logger.info(f"connection: {connection} {type(connection)}")
2✔
76
        logger.info(f"Removing connection: {service_id} {connection.get('status')}")
2✔
77

78
        reason, code = connection_handler.remove_connection(
2✔
79
            current_app.te_manager, service_id, "API"
80
        )
81
        if code // 100 != 2:
2✔
NEW
82
            return reason, code
×
83
        db_instance.mark_deleted(MongoCollections.CONNECTIONS, f"{service_id}")
2✔
84
        db_instance.mark_deleted(MongoCollections.BREAKDOWNS, f"{service_id}")
2✔
85
    except Exception as e:
×
86
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
87
        return f"Failed, reason: {e}", 500
×
88

89
    return "OK", 200
2✔
90

91

92
def get_connection_by_id(service_id):
2✔
93
    """
94
    Find connection by ID.
95

96
    :param service_id: ID of connection that needs to be fetched
97
    :type service_id: str
98

99
    :rtype: Connection
100
    """
101

102
    value = get_connection_status(db_instance, service_id)
2✔
103

104
    if not value:
2✔
105
        return "Connection not found", 404
2✔
106

107
    return value
2✔
108

109

110
def get_connections():  # noqa: E501
2✔
111
    """
112
    List all connections
113

114
    connection details # noqa: E501
115

116
    :rtype: Connection
117
    """
118
    values = db_instance.get_all_entries_in_collection(MongoCollections.CONNECTIONS)
2✔
119
    if not values:
2✔
120
        return "No connection was found", 404
2✔
121
    return_values = {}
2✔
122
    for connection in values:
2✔
123
        service_id = next(iter(connection))
2✔
124
        logger.info(f"service_id: {service_id}")
2✔
125
        connection_status = get_connection_status(db_instance, service_id)
2✔
126
        if connection_status:
2✔
127
            return_values[service_id] = connection_status.get(service_id)
2✔
128
    return return_values
2✔
129

130

131
def get_archived_connections():
2✔
132
    """
133
    List all archived connections.
134

135
    :rtype: dict
136
    """
137
    values = db_instance.get_all_entries_in_collection(
2✔
138
        MongoCollections.HISTORICAL_CONNECTIONS
139
    )
140
    if not values:
2✔
141
        return "No archived connection was found", 404
2✔
142

143
    return_values = {}
2✔
144
    for archived_connection in values:
2✔
145
        service_id = next(iter(archived_connection))
2✔
146
        archived_events = connection_handler.get_archived_connections(service_id)
2✔
147
        if archived_events:
2✔
148
            return_values[service_id] = archived_events
2✔
149

150
    if not return_values:
2✔
151
        return "No archived connection was found", 404
×
152
    return return_values
2✔
153

154

155
def place_connection(body):
2✔
156
    """
157
    Place an connection request from the SDX-Controller.
158

159
    :param body: order placed for creating a connection
160
    :type body: dict | bytes
161

162
    :rtype: Connection
163
    """
164
    logger.info(f"Placing connection: {body}")
2✔
165
    if not connexion.request.is_json:
2✔
166
        return "Request body must be JSON", 400
×
167

168
    body = connexion.request.get_json()
2✔
169
    logger.info(f"Gathered connexion JSON: {body}")
2✔
170

171
    logger.info("Placing connection. Saving to database.")
2✔
172

173
    service_id = body.get("id")
2✔
174

175
    if service_id is None:
2✔
176
        service_id = str(uuid.uuid4())
2✔
177
        body["id"] = service_id
2✔
178
        logger.info(f"Request has no ID. Generated ID: {service_id}")
2✔
179

180
    body["status"] = str(ConnectionStateMachine.State.REQUESTED)
2✔
181

182
    # used in lc_message_handler to count the oxp success response
183
    body["oxp_success_count"] = 0
2✔
184
    body["partial_cleanup_requested"] = False
2✔
185
    body["provisioning_timeout_handled"] = False
2✔
186
    body["provisioning_started_at"] = time.time()
2✔
187
    body.pop("timeout_reason", None)
2✔
188

189
    conn_status = ConnectionStateMachine.State.UNDER_PROVISIONING
2✔
190
    body, _ = connection_state_machine(body, conn_status)
2✔
191

192
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
2✔
193

194
    logger.info(
2✔
195
        f"Handling request {service_id} with te_manager: {current_app.te_manager}"
196
    )
197
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
2✔
198

199
    if code // 100 != 2:
2✔
200
        conn_status = ConnectionStateMachine.State.REJECTED
2✔
201
        db_instance.update_field_in_json(
2✔
202
            MongoCollections.CONNECTIONS,
203
            service_id,
204
            "status",
205
            str(conn_status),
206
        )
207
    logger.info(
2✔
208
        f"place_connection result: ID: {service_id} reason='{reason}', code={code}"
209
    )
210

211
    response = {
2✔
212
        "service_id": service_id,
213
        "status": parse_conn_status(str(conn_status)),
214
        "reason": reason,
215
    }
216

217
    # # TODO: our response is supposed to be shaped just like request
218
    # # ('#/components/schemas/connection'), and in that case the below
219
    # # code would be a quick implementation.
220
    # #
221
    # # https://github.com/atlanticwave-sdx/sdx-controller/issues/251
222
    # response = body
223

224
    # response["id"] = service_id
225
    # response["status"] = "success" if code == 2xx else "failure"
226
    # response["reason"] = reason # `reason` is not present in schema though.
227

228
    return response, code
2✔
229

230

231
def patch_connection(service_id, body=None):  # noqa: E501
2✔
232
    """Edit and change an existing L2vpn connection by ID from the SDX-Controller
233

234
     # noqa: E501
235

236
    :param service_id: ID of l2vpn connection that needs to be changed
237
    :type service_id: dict | bytes'
238
    :param body:
239
    :type body: dict | bytes
240

241
    :rtype: Connection
242
    """
243
    body = db_instance.get_value_from_db(MongoCollections.CONNECTIONS, f"{service_id}")
×
244
    if not body:
×
245
        return "Connection not found", 404
×
246

247
    if not connexion.request.is_json:
×
248
        return "Request body must be JSON", 400
×
249

250
    new_body = connexion.request.get_json()
×
251

252
    logger.info(f"Gathered connexion JSON: {new_body}")
×
253

254
    # Preserve the last successful request so rollback can recreate it cleanly.
NEW
255
    rollback_conn_body = copy.deepcopy(body)
×
UNCOV
256
    body.update(new_body)
×
257

258
    body, _ = connection_state_machine(body, ConnectionStateMachine.State.MODIFYING)
×
259

260
    body["oxp_success_count"] = 0
×
NEW
261
    body["partial_cleanup_requested"] = False
×
NEW
262
    body["provisioning_timeout_handled"] = False
×
NEW
263
    body["provisioning_started_at"] = time.time()
×
NEW
264
    body.pop("timeout_reason", None)
×
265

266
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
×
267

268
    try:
×
269
        logger.info("Removing connection")
×
UNCOV
270
        remove_conn_reason, remove_conn_code = connection_handler.remove_connection(
×
271
            current_app.te_manager, service_id, "API"
272
        )
273

274
        if remove_conn_code // 100 != 2:
×
275
            body, _ = connection_state_machine(body, ConnectionStateMachine.State.DOWN)
×
276
            db_instance.add_key_value_pair_to_db(
×
277
                MongoCollections.CONNECTIONS, service_id, body
278
            )
279
            response = {
×
280
                "service_id": service_id,
281
                "status": parse_conn_status(body["status"]),
282
                "reason": f"Failure to modify L2VPN during removal: {remove_conn_reason}",
283
            }
284
            return response, remove_conn_code
×
285

286
        logger.info(f"Removed connection: {service_id}")
×
287
    except Exception as e:
×
288
        logger.info(f"Delete failed (connection id: {service_id}): {e}")
×
289
        return f"Failed, reason: {e}", 500
×
290

291
    logger.info(
×
292
        f"Placing new connection {service_id} with te_manager: {current_app.te_manager}"
293
    )
294

NEW
295
    body["oxp_success_count"] = 0
×
NEW
296
    body["oxp_response"] = {}
×
NEW
297
    body["late_cleanup_domains"] = []
×
NEW
298
    body["partial_cleanup_requested"] = False
×
NEW
299
    body["provisioning_timeout_handled"] = False
×
NEW
300
    body["provisioning_started_at"] = time.time()
×
NEW
301
    body.pop("timeout_reason", None)
×
302

UNCOV
303
    body, _ = connection_state_machine(
×
304
        body, ConnectionStateMachine.State.UNDER_PROVISIONING
305
    )
306
    db_instance.add_key_value_pair_to_db(MongoCollections.CONNECTIONS, service_id, body)
×
307
    reason, code = connection_handler.place_connection(current_app.te_manager, body)
×
308

309
    if code // 100 == 2:
×
310
        # Service created successfully
311
        code = 201
×
312
        logger.info(f"Placed: ID: {service_id} reason='{reason}', code={code}")
×
313
        response = {
×
314
            "service_id": service_id,
315
            "status": parse_conn_status(body["status"]),
316
            "reason": reason,
317
        }
318
        return response, code
×
319
    else:
320
        body, _ = connection_state_machine(body, ConnectionStateMachine.State.DOWN)
×
321

322
    logger.info(
×
323
        f"Failed to place new connection. ID: {service_id} reason='{reason}', code={code}"
324
    )
325
    logger.info("Rolling back to old connection.")
×
326

327
    if not rollback_conn_body:
×
328
        response = {
×
329
            "service_id": service_id,
330
            "status": parse_conn_status(body["status"]),
331
            "reason": f"Failure, unable to rollback to last successful L2VPN: {reason}",
332
        }
333
        return response, code
×
334

335
    # because above placement failed, so re-place the original connection request.
336
    conn_request = rollback_conn_body
×
337
    conn_request["id"] = service_id
×
NEW
338
    conn_request["status"] = str(ConnectionStateMachine.State.REQUESTED)
×
NEW
339
    conn_request["oxp_success_count"] = 0
×
NEW
340
    conn_request["oxp_response"] = {}
×
NEW
341
    conn_request["late_cleanup_domains"] = []
×
NEW
342
    conn_request["partial_cleanup_requested"] = False
×
NEW
343
    conn_request["provisioning_timeout_handled"] = False
×
NEW
344
    conn_request["provisioning_started_at"] = time.time()
×
NEW
345
    conn_request.pop("timeout_reason", None)
×
NEW
346
    conn_request, _ = connection_state_machine(
×
347
        conn_request, ConnectionStateMachine.State.UNDER_PROVISIONING
348
    )
349

350
    try:
×
351
        rollback_conn_reason, rollback_conn_code = connection_handler.place_connection(
×
352
            current_app.te_manager, conn_request
353
        )
354
        if rollback_conn_code // 100 == 2:
×
355
            db_instance.add_key_value_pair_to_db(
×
356
                MongoCollections.CONNECTIONS, service_id, conn_request
357
            )
NEW
358
            deadline = time.time() + ROLLBACK_SETTLE_TIMEOUT_SECONDS
×
NEW
359
            while time.time() < deadline:
×
NEW
360
                current_conn = db_instance.get_value_from_db(
×
361
                    MongoCollections.CONNECTIONS, service_id
362
                )
NEW
363
                current_status = current_conn.get("status") if current_conn else None
×
NEW
364
                if current_status != str(
×
365
                    ConnectionStateMachine.State.UNDER_PROVISIONING
366
                ):
NEW
367
                    break
×
NEW
368
                time.sleep(ROLLBACK_SETTLE_POLL_SECONDS)
×
UNCOV
369
        logger.info(
×
370
            f"Roll back connection result: ID: {service_id} reason='{rollback_conn_reason}', code={rollback_conn_code}"
371
        )
372
    except Exception as e:
×
373
        logger.info(f"Rollback failed (connection id: {service_id}): {e}")
×
374
        return f"Rollback failed, reason: {e}", 500
×
375

NEW
376
    response_code = code if rollback_conn_code // 100 == 2 else rollback_conn_code
×
UNCOV
377
    response = {
×
378
        "service_id": service_id,
379
        "reason": f"Failure, rolled back to last successful L2VPN: {reason}",
380
        "status": parse_conn_status(conn_request["status"]),
381
    }
NEW
382
    return response, response_code
×
383

384

385
def get_archived_connections_by_id(service_id):
2✔
386
    """
387
    List archived connection by ID.
388

389
    :param service_id: ID of connection that needs to be fetched
390
    :type service_id: str
391

392
    :rtype: Connection
393
    """
394

395
    value = connection_handler.get_archived_connections(service_id)
2✔
396

397
    if not value:
2✔
398
        return "Archived connection not found", 404
2✔
399

400
    return {service_id: value}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc