• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / audit-service / 20868243664

17 Dec 2025 05:14PM UTC coverage: 79.798% (-1.0%) from 80.753%
20868243664

push

github

web-flow
Upgrade to python3.13 and patch urllib3

* Upgrade to python3.13

* fix importlib.metadata entrypoints

* Apply automatic documentation changes

* upgrade boto3, botocore, and urllib3 for poam ticket

* update alembic

---------

Co-authored-by: jacob50231 <jacob50231@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

7 existing lines in 3 files now uncovered.

553 of 693 relevant lines covered (79.8%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.69
/src/audit/app.py
1
import asyncio
1✔
2
import httpx
1✔
3
import os
1✔
4
from fastapi import FastAPI
1✔
5
from contextlib import asynccontextmanager
1✔
6
from typing import AsyncIterable
1✔
7

8
from importlib.metadata import entry_points, version
1✔
9

10
from cdislogging import get_logger
1✔
11
from gen3authz.client.arborist.async_client import ArboristClient
1✔
12

13
from . import logger
1✔
14
from .config import config, DEFAULT_CFG_PATH
1✔
15

16
# Load the configuration *before* importing modules that rely on it
17
try:
1✔
18
    if os.environ.get("AUDIT_SERVICE_CONFIG_PATH"):
1✔
19
        config.load(config_path=os.environ["AUDIT_SERVICE_CONFIG_PATH"])
1✔
20
    else:
21
        CONFIG_SEARCH_FOLDERS = [
×
22
            "/src",
23
            "{}/.gen3/audit-service".format(os.path.expanduser("~")),
24
        ]
25
        config.load(search_folders=CONFIG_SEARCH_FOLDERS)
×
26
except Exception:
×
27
    logger.warning("Unable to load config, using default config...", exc_info=True)
×
28
    config.load(config_path=DEFAULT_CFG_PATH)
×
29

30
from .pull_from_queue import pull_from_queue_loop
1✔
31
from .db import initiate_db, DataAccessLayer, get_data_access_layer
1✔
32

33

34
def load_modules(app: FastAPI = None) -> None:
1✔
35
    for ep in entry_points(group="audit.modules"):
1✔
36
        logger.info("Loading module: %s", ep.name)
1✔
37
        mod = ep.load()
1✔
38
        if app:
1✔
39
            init_app = getattr(mod, "init_app", None)
1✔
40
            if init_app:
1✔
41
                init_app(app)
1✔
42

43

44
def app_init() -> FastAPI:
1✔
45
    logger.info("Initializing app")
1✔
46
    config.validate(logger)
1✔
47

48
    debug = config["DEBUG"]
1✔
49
    app = FastAPI(
1✔
50
        title="Audit Service",
51
        version=version("audit"),
52
        debug=debug,
53
        lifespan=lifespan,
54
        # root_path=config["DOCS_URL_PREFIX"],
55
    )
56
    app.add_middleware(ClientDisconnectMiddleware)
1✔
57
    app.async_client = httpx.AsyncClient()
1✔
58

59
    # Following will update logger level, propagate, and handlers
60
    get_logger("audit-service", log_level="debug" if debug == True else "info")
1✔
61

62
    logger.info("Initializing Arborist client")
1✔
63
    if os.environ.get("ARBORIST_URL"):
1✔
64
        app.arborist_client = ArboristClient(
×
65
            arborist_base_url=os.environ["ARBORIST_URL"],
66
            logger=logger,
67
        )
68
    else:
69
        app.arborist_client = ArboristClient(logger=logger)
1✔
70

71
    load_modules(app)
1✔
72

73
    return app
1✔
74

75

76
@asynccontextmanager
1✔
77
async def lifespan(app: FastAPI):
1✔
78
    """
79
    Parse the configuration, setup and instantiate necessary classes.
80

81
    This is FastAPI's way of dealing with startup logic before the app
82
    starts receiving requests.
83

84
    https://fastapi.tiangolo.com/advanced/events/#lifespan
85

86
    Args:
87
        app (fastapi.FastAPI): The FastAPI app object
88
    """
89
    # startup
90

91
    await initiate_db()
1✔
92
    await check_db_connection()
1✔
93

94
    if config["PULL_FROM_QUEUE"] and config["QUEUE_CONFIG"].get("type") == "aws_sqs":
1✔
95
        logger.info("Initiating SQS pull.")
×
96
        await initiate_sqs_pull()
×
97

98
    yield
1✔
99

100
    # teardown
101
    logger.info("Closing async client.")
1✔
102
    await app.async_client.aclose()
1✔
103
    logger.info("[Completed] Closing async client.")
1✔
104

105

106
async def check_db_connection():
1✔
107
    """
108
    Simple check to ensure we can talk to the db
109
    """
110
    try:
1✔
111
        logger.debug(
1✔
112
            "Startup database connection test initiating. Attempting a simple query..."
113
        )
114
        data_access_layers: AsyncIterable[DataAccessLayer] = get_data_access_layer()
1✔
115
        async for data_access_layer in data_access_layers:
1✔
116
            await data_access_layer.test_connection()
1✔
UNCOV
117
            logger.debug("Startup database connection test PASSED.")
×
118
    except Exception as exc:
×
119
        logger.exception(
×
120
            "Startup database connection test FAILED. Unable to connect to the configured database."
121
        )
122
        logger.debug(exc)
×
123
        raise
×
124

125

126
async def initiate_sqs_pull():
1✔
127
    """
128
    Start the SQS pull loop in the background."""
129

130
    def handle_exception(loop, context):
×
131
        """
132
        Whenever an exception occurs in the asyncio loop, the loop still continues to execute without crashing.
133
        Therefore, we implement a custom exception handler that will ensure that the loop is stopped upon an Exception.
134
        """
135
        msg = context.get("exception", context.get("message"))
×
136
        logger.error(f"Caught exception: {msg}")
×
137
        for _, task in enumerate(asyncio.all_tasks()):
×
138
            task.cancel()
×
139
        logger.info("Closed all tasks")
×
140

141
    loop = asyncio.get_running_loop()
×
142
    loop.create_task(pull_from_queue_loop())
×
143
    loop.set_exception_handler(handle_exception)
×
144

145

146
class ClientDisconnectMiddleware:
1✔
147
    def __init__(self, app):
1✔
148
        self._app = app
1✔
149

150
    async def __call__(self, scope, receive, send):
1✔
151
        loop = asyncio.get_running_loop()
1✔
152
        rv = loop.create_task(self._app(scope, receive, send))
1✔
153
        waiter = None
1✔
154
        cancelled = False
1✔
155
        if scope["type"] == "http":
1✔
156

157
            def add_close_watcher():
1✔
158
                nonlocal waiter
159

160
                async def wait_closed():
×
161
                    nonlocal cancelled
UNCOV
162
                    while True:
×
163
                        message = await receive()
×
164
                        if message["type"] == "http.disconnect":
×
165
                            if not rv.done():
×
166
                                cancelled = True
×
167
                                rv.cancel()
×
168
                            break
×
169

170
                waiter = loop.create_task(wait_closed())
×
171

172
            scope["add_close_watcher"] = add_close_watcher
1✔
173
        try:
1✔
174
            await rv
1✔
175
        except asyncio.CancelledError:
×
176
            if not cancelled:
×
177
                raise
×
178
        if waiter and not waiter.done():
1✔
179
            waiter.cancel()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc