• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 10494965971

21 Aug 2024 05:57PM UTC coverage: 58.211%. Remained the same
10494965971

push

github

web-flow
Merge pull request #319 from atlanticwave-sdx/update-sdx-message

Update SDX message when no topology is available yet

465 of 735 branches covered (63.27%)

Branch coverage included in aggregate %.

1 of 3 new or added lines in 2 files covered. (33.33%)

1038 of 1847 relevant lines covered (56.2%)

2.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/sdx_controller/__init__.py
1
import logging
4✔
2
import os
4✔
3
import threading
4✔
4
from queue import Queue
4✔
5

6
import connexion
4✔
7
from sdx_pce.topology.temanager import TEManager
4✔
8

9
from sdx_controller import encoder
4✔
10
from sdx_controller.messaging.rpc_queue_consumer import RpcConsumer
4✔
11
from sdx_controller.utils.db_utils import DbUtils
4✔
12

13
logger = logging.getLogger(__name__)
4✔
14
logging.getLogger("pika").setLevel(logging.WARNING)
4✔
15
LOG_FILE = os.environ.get("LOG_FILE")
4✔
16

17

18
def create_rpc_thread(app):
4✔
19
    """
20
    Start a thread to get items off the message queue.
21
    """
22
    thread_queue = Queue()
4✔
23

24
    app.rpc_consumer = RpcConsumer(thread_queue, "", app.te_manager)
4✔
25
    rpc_thread = threading.Thread(
4✔
26
        target=app.rpc_consumer.start_sdx_consumer,
27
        kwargs={"thread_queue": thread_queue, "db_instance": app.db_instance},
28
        daemon=True,
29
    )
30

31
    rpc_thread.start()
4✔
32

33

34
def create_app(run_listener: bool = True):
4✔
35
    """
36
    Create a connexion app.
37

38
    The object returned is a Connexion App, which in turn contains a
39
    Flask app, that we can run either with Flask or an ASGI server
40
    such as uvicorn::
41

42
        $ flask run sdx_server.app:app
43
        $ uvicorn run sdx_server.app:asgi_app
44

45
    We also create a thread that subscribes to our message queue.
46
    Occasionally it might be useful not to start the thread (such as
47
    when running the test suite, because currently our tests do not
48
    use the message queue), and we might want to disable those
49
    threads, which is when run_listener param might be useful.
50
    """
51
    if LOG_FILE:
4✔
NEW
52
        logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG)
×
53
    else:
54
        logging.basicConfig(level=logging.DEBUG)
4✔
55

56
    app = connexion.App(__name__, specification_dir="./swagger/")
4✔
57
    app.app.json_encoder = encoder.JSONEncoder
4✔
58
    app.add_api(
4✔
59
        "swagger.yaml", arguments={"title": "SDX-Controller"}, pythonic_params=True
60
    )
61

62
    # Get DB connection and tables set up.
63
    app.db_instance = DbUtils()
4✔
64
    app.db_instance.initialize_db()
4✔
65

66
    # Get a handle to PCE.
67
    app.te_manager = TEManager(topology_data=None)
4✔
68

69
    # TODO: This is a hack, until we find a better way to get a handle
70
    # to TEManager from Flask current_app, which are typically
71
    # available to request handlers.  There must be a better way to
72
    # pass this around.
73
    app.app.te_manager = app.te_manager
4✔
74

75
    if run_listener:
4✔
76
        create_rpc_thread(app)
4✔
77
    else:
78
        app.rpc_consumer = None
×
79

80
    return app
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc