• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 25141814280

30 Apr 2026 12:57AM UTC coverage: 50.292% (-2.5%) from 52.799%
25141814280

Pull #524

github

web-flow
Merge f97220711 into 26d01cbb9
Pull Request #524: Rollback on failed OXP POST

112 of 359 new or added lines in 5 files covered. (31.2%)

141 existing lines in 4 files now uncovered.

1379 of 2742 relevant lines covered (50.29%)

1.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/sdx_controller/__init__.py
1
import logging
2✔
2
import os
2✔
3
import threading
2✔
4
import time
2✔
5
from queue import Queue
2✔
6

7
import connexion
2✔
8
from sdx_datamodel.connection_sm import ConnectionStateMachine
2✔
9
from sdx_datamodel.constants import MongoCollections
2✔
10
from sdx_pce.topology.temanager import TEManager
2✔
11

12
from sdx_controller import encoder
2✔
13
from sdx_controller.handlers.connection_handler import (
2✔
14
    ConnectionHandler,
15
    connection_state_machine,
16
)
17
from sdx_controller.messaging.rpc_queue_consumer import RpcConsumer
2✔
18
from sdx_controller.utils.db_utils import DbUtils
2✔
19

20
logger = logging.getLogger(__name__)
2✔
21
logging.getLogger("pika").setLevel(logging.WARNING)
2✔
22
LOG_FILE = os.environ.get("LOG_FILE")
2✔
23
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG")
2✔
24
OXP_RESPONSE_TIMEOUT = int(os.getenv("OXP_RESPONSE_TIMEOUT", 60))
2✔
25
PROVISIONING_MONITOR_INTERVAL = int(os.getenv("PROVISIONING_MONITOR_INTERVAL", 5))
2✔
26

27

28
def create_rpc_thread(app):
2✔
29
    """
30
    Start a thread to get items off the message queue.
31
    """
32
    thread_queue = Queue()
2✔
33

34
    app.rpc_consumer = RpcConsumer(thread_queue, "", app.te_manager)
2✔
35
    rpc_thread = threading.Thread(
2✔
36
        target=app.rpc_consumer.start_sdx_consumer,
37
        kwargs={"thread_queue": thread_queue, "db_instance": app.db_instance},
38
        daemon=True,
39
    )
40

41
    rpc_thread.start()
2✔
42

43

44
def create_provisioning_timeout_thread(app):
2✔
45
    """
46
    Start a background monitor for connections stuck in
47
    UNDER_PROVISIONING longer than the configured timeout.
48
    """
49
    if OXP_RESPONSE_TIMEOUT <= 0:
2✔
NEW
50
        logger.info("[ProvisioningTimeout] Disabled.")
×
NEW
51
        app.provisioning_timeout_thread = None
×
NEW
52
        return
×
53

54
    connection_handler = ConnectionHandler(app.db_instance)
2✔
55

56
    def monitor_loop():
2✔
57
        logger.info(
2✔
58
            f"[ProvisioningTimeout] Started monitoring with timeout={OXP_RESPONSE_TIMEOUT}s interval={PROVISIONING_MONITOR_INTERVAL}s."
59
        )
60
        while True:
2✔
61
            try:
2✔
62
                now = time.time()
2✔
63
                connections = app.db_instance.get_all_entries_in_collection(
2✔
64
                    MongoCollections.CONNECTIONS
65
                )
66
                for connection_entry in connections:
2✔
67
                    service_id = next(iter(connection_entry), None)
2✔
68
                    connection = (
2✔
69
                        connection_entry.get(service_id) if service_id else None
70
                    )
71
                    if not isinstance(connection, dict):
2✔
NEW
72
                        continue
×
73
                    if connection.get("status") != str(
2✔
74
                        ConnectionStateMachine.State.UNDER_PROVISIONING
75
                    ):
76
                        continue
2✔
77

78
                    started_at = connection.get("provisioning_started_at")
2✔
79
                    if not isinstance(started_at, (int, float)):
2✔
NEW
80
                        continue
×
81
                    if connection.get("provisioning_timeout_handled"):
2✔
NEW
82
                        continue
×
83
                    if now - started_at < OXP_RESPONSE_TIMEOUT:
2✔
84
                        continue
2✔
85

NEW
86
                    logger.warning(
×
87
                        f"[ProvisioningTimeout] Connection {service_id} timed out after {int(now - started_at)}s waiting for OXP responses."
88
                    )
89

NEW
90
                    connection["provisioning_timeout_handled"] = True
×
NEW
91
                    connection["partial_cleanup_requested"] = True
×
NEW
92
                    connection["timeout_reason"] = (
×
93
                        f"OXP response timeout after {OXP_RESPONSE_TIMEOUT} seconds"
94
                    )
NEW
95
                    connection, _ = connection_state_machine(
×
96
                        connection, ConnectionStateMachine.State.DOWN
97
                    )
NEW
98
                    app.db_instance.add_key_value_pair_to_db(
×
99
                        MongoCollections.CONNECTIONS, service_id, connection
100
                    )
NEW
101
                    cleanup_status, cleanup_code = (
×
102
                        connection_handler.cleanup_partial_connection(
103
                            app.te_manager, service_id, connection
104
                        )
105
                    )
NEW
106
                    logger.info(
×
107
                        f"[ProvisioningTimeout] Cleanup result for {service_id}: {cleanup_status}, code={cleanup_code}"
108
                    )
NEW
109
            except Exception as e:
×
NEW
110
                logger.exception(
×
111
                    f"[ProvisioningTimeout] Error while monitoring connections: {e}"
112
                )
113
            time.sleep(PROVISIONING_MONITOR_INTERVAL)
2✔
114

115
    provisioning_thread = threading.Thread(target=monitor_loop, daemon=True)
2✔
116
    provisioning_thread.start()
2✔
117
    app.provisioning_timeout_thread = provisioning_thread
2✔
118

119

120
def create_app(run_listener: bool = True):
2✔
121
    """
122
    Create a connexion app.
123

124
    The object returned is a Connexion App, which in turn contains a
125
    Flask app, that we can run either with Flask or an ASGI server
126
    such as uvicorn::
127

128
        $ flask run sdx_server.app:app
129
        $ uvicorn run sdx_server.app:asgi_app
130

131
    We also create a thread that subscribes to our message queue.
132
    Occasionally it might be useful not to start the thread (such as
133
    when running the test suite, because currently our tests do not
134
    use the message queue), and we might want to disable those
135
    threads, which is when run_listener param might be useful.
136
    """
137
    if LOG_FILE:
2✔
138
        logging.basicConfig(filename=LOG_FILE, level=logging.getLevelName(LOG_LEVEL))
×
139
    else:
140
        logging.basicConfig(level=logging.getLevelName(LOG_LEVEL))
2✔
141

142
    logging.getLogger("sdx_pce.topology.temanager").setLevel(logging.INFO)
2✔
143
    app = connexion.App(__name__, specification_dir="./swagger/")
2✔
144
    app.app.json_encoder = encoder.JSONEncoder
2✔
145
    app.add_api(
2✔
146
        "swagger.yaml", arguments={"title": "SDX-Controller"}, pythonic_params=True
147
    )
148

149
    # Get DB connection and tables set up.
150
    app.db_instance = DbUtils()
2✔
151
    app.db_instance.initialize_db()
2✔
152

153
    # Get a handle to PCE.
154
    app.te_manager = TEManager(topology_data=None)
2✔
155

156
    # TODO: This is a hack, until we find a better way to get a handle
157
    # to TEManager from Flask current_app, which are typically
158
    # available to request handlers.  There must be a better way to
159
    # pass this around.
160
    app.app.te_manager = app.te_manager
2✔
161

162
    create_provisioning_timeout_thread(app)
2✔
163

164
    if run_listener:
2✔
165
        create_rpc_thread(app)
2✔
166
    else:
167
        app.rpc_consumer = None
×
168

169
    return app
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc