• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / audit-service / 20283968191

16 Dec 2025 09:55PM UTC coverage: 79.798% (-1.0%) from 80.753%
20283968191

Pull #101

github

jacob50231
Actual dockerfile solution
Pull Request #101: Upgrade to python3.13

1 of 1 new or added line in 1 file covered. (100.0%)

7 existing lines in 3 files now uncovered.

553 of 693 relevant lines covered (79.8%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.68
/src/audit/db.py
1
"""
2
This file houses the database logic.
3
For schema/model of particular tables, go to `models.py`
4

5
OVERVIEW
6
--------
7

8
We're using SQLAlchemy's async support alongside FastAPI's dependency injection.
9

10
This file contains the logic for database manipulation in a "data access layer"
11
class, such that other areas of the code have simple `.create_list()` calls which
12
won't require knowledge on how to manage the session or interact with the db.
13
The session will be managed by dep injection of FastAPI's endpoints.
14
The logic that sets up the sessions is in this file.
15

16

17
DETAILS
18
-------
19
What do we do in this file?
20

21
- We create a sqlalchemy engine and session maker factory as globals
22
    - This reads in the db URL from config
23
- We define a data access layer class here which isolates the database manipulations
24
    - All CRUD operations go through this interface instead of bleeding specific database
25
      manipulations into the higher level web app endpoint code
26
- We create a function which yields an instance of the data access layer class with
27
  a fresh session from the session maker factory
28
    - This is what gets injected into endpoint code using FastAPI's dep injections
29
"""
30
from contextlib import asynccontextmanager
1✔
31
from typing import Any, Dict, AsyncGenerator, List, Tuple, Optional
1✔
32
from datetime import datetime
1✔
33
from sqlalchemy import text, select, func, or_
1✔
34
from sqlalchemy.ext.asyncio import (
1✔
35
    AsyncEngine,
36
    AsyncSession,
37
    async_sessionmaker,
38
    create_async_engine,
39
)
40

41
from audit.config import config
1✔
42
from audit.models import PresignedUrl, Login
1✔
43
from audit import logger
1✔
44

45
engine = None
1✔
46
async_sessionmaker_instance = None
1✔
47

48

49
async def initiate_db() -> None:
1✔
50
    """
51
    Initialize the database enigne.
52
    """
53
    global engine, async_sessionmaker_instance
54
    logger.info(f"DB_URL: {config['DB_URL']}")
1✔
55
    engine = create_async_engine(
1✔
56
        url=config["DB_URL"],
57
        pool_size=config.get("DB_POOL_MIN_SIZE", 15),
58
        max_overflow=config["DB_POOL_MAX_SIZE"] - config["DB_POOL_MIN_SIZE"],
59
        echo=config["DB_ECHO"],
60
        connect_args={"ssl": config["DB_SSL"]} if config["DB_SSL"] else {},
61
        pool_pre_ping=True,
62
    )
63

64
    # creates AsyncSession instances
65
    async_sessionmaker_instance = async_sessionmaker(
1✔
66
        bind=engine, expire_on_commit=False
67
    )
68

69

70
def get_db_engine_and_sessionmaker() -> tuple[AsyncEngine, async_sessionmaker]:
1✔
71
    """
72
    Get the db engine and sessionmaker instances.
73
    """
74
    global engine, async_sessionmaker_instance
75
    if engine is None or async_sessionmaker_instance is None:
1✔
76
        raise Exception("Database not initialized. Call initiate_db() first.")
×
77
    return engine, async_sessionmaker_instance
1✔
78

79

80
class DataAccessLayer:
1✔
81
    """
82
    Defines an abstract interface to manipulate the database. Instances are given a session to
83
    act within.
84
    """
85

86
    def __init__(self, db_session: AsyncSession):
1✔
87
        self.db_session = db_session
1✔
88

89
    async def test_connection(self) -> None:
1✔
90
        """
91
        Ensure we can actually communicate with the db
92
        """
93
        await self.db_session.execute(text("SELECT 1;"))
1✔
94

95
    def _apply_query_filters(
1✔
96
        self, model, query, query_params, start_date=None, stop_date=None
97
    ):
98
        """
99
        Apply filters to a SQLAlchemy query based on the provided parameters.
100
        Raises ValueError for invalid field values.
101
        """
102

103
        def _cast_field_value(model, field, value):
1✔
104
            field_type = getattr(model, field).type.python_type
1✔
105
            if field_type == datetime:
1✔
106
                try:
1✔
107
                    return datetime.fromtimestamp(int(value))
1✔
108
                except ValueError as e:
×
109
                    raise ValueError(
×
110
                        f"Unable to convert value '{value}' to datetime for field '{field}': {e}"
111
                    )
112
            try:
1✔
113
                return field_type(value)
1✔
114
            except ValueError as e:
1✔
115
                raise ValueError(
1✔
116
                    f"Value '{value}' is not valid for field '{field}': {e}"
117
                )
118

119
        if start_date:
1✔
120
            query = query.where(model.timestamp >= start_date)
1✔
121
        if stop_date:
1✔
122
            query = query.where(model.timestamp < stop_date)
1✔
123

124
        for field, values in query_params.items():
1✔
125
            column = getattr(model, field)
1✔
126

127
            # TODO for resource_paths, implement filtering in a way that
128
            # would return "/A/B" when querying "/A".
129
            if hasattr(column.type, "item_type"):  # ARRAY
1✔
130
                query = query.where(column.overlap(values))
1✔
131
            else:
132
                typed_values = [_cast_field_value(model, field, v) for v in values]
1✔
133
                query = query.where(or_(*(column == v for v in typed_values)))
1✔
134

135
        return query
1✔
136

137
    async def query_logs(
1✔
138
        self, model, start_date, stop_date, query_params, count
139
    ) -> Tuple[List[Dict[str, Any]], Optional[int]]:
140
        """
141
        Query logs from the database with pagination support.
142
        Returns a tuple of (logs, next_timestamp).
143
        """
144
        # get all logs matching the filters and apply the page size limit
145
        # Build initial query with filters
146
        base_query = select(model)
1✔
147
        base_query = self._apply_query_filters(
1✔
148
            model, base_query, query_params, start_date, stop_date
149
        )
150
        base_query = base_query.order_by(model.timestamp)
1✔
151
        if not count:
1✔
152
            base_query = base_query.limit(config["QUERY_PAGE_SIZE"])
1✔
153

154
        result = await self.db_session.execute(base_query)
1✔
155
        logs = result.scalars().all()
×
156

157
        if not logs or count:
×
158
            # `count` queries are not paginated: no next timestamp
159
            return logs, None
×
160

161
        # if there are more logs with the same timestamp as the last queried log, also return them.
162
        # We use timestamp as the primary key for our queries and sorting.
163
        # We'll have to add something like a request.uuid to the records if we want to enforce page sizes and sort order.
164
        last_timestamp = logs[-1].timestamp
×
165

166
        # Get extra logs with the same timestamp as the last one
167
        extra_query = select(model)
×
168
        extra_query = self._apply_query_filters(
×
169
            model, extra_query, query_params, start_date, stop_date
170
        )
171
        extra_query = extra_query.where(model.timestamp == last_timestamp).order_by(
×
172
            model.timestamp
173
        )
174
        extra_result = await self.db_session.execute(extra_query)
×
175
        extra_logs = extra_result.scalars().all()
×
176

177
        if len(extra_logs) > 1:
×
UNCOV
178
            logs = [log for log in logs if log.timestamp != last_timestamp]
×
179
            logs.extend(extra_logs)
×
180

181
        # Get the next timestamp
182
        next_query = select(model)
×
183
        next_query = self._apply_query_filters(
×
184
            model, next_query, query_params, start_date, stop_date
185
        )
186
        next_query = next_query.where(model.timestamp > last_timestamp).order_by(
×
187
            model.timestamp
188
        )
189
        next_result = await self.db_session.execute(next_query)
×
190
        next_log = next_result.scalars().first()
×
191

192
        next_timestamp = (
×
193
            int(datetime.timestamp(next_log.timestamp)) if next_log else None
194
        )
195

UNCOV
196
        logs = [log.to_dict() for log in logs]
×
197
        return logs, next_timestamp
×
198

199
    async def query_logs_with_grouping(
1✔
200
        self, model, start_date, stop_date, query_params, groupby
201
    ) -> List[Dict[str, Any]]:
202
        """
203
        Query logs from the database with grouping support.
204
        Returns a list of dictionaries containing the grouped data.
205
        """
206
        select_list = [getattr(model, field) for field in groupby]
1✔
207
        select_list.append(func.count(model.username).label("count"))
1✔
208
        query = select(*select_list)
1✔
209
        for field in groupby:
1✔
210
            query = query.group_by(getattr(model, field))
1✔
211
        query = self._apply_query_filters(
1✔
212
            model, query, query_params, start_date, stop_date
213
        )
214
        result = await self.db_session.execute(query)
1✔
215
        logs = result.all()
×
UNCOV
216
        return [dict(row._mapping) for row in logs]
×
217

218
    async def create_presigned_url_log(self, data: Dict[str, Any]) -> None:
1✔
219
        """
220
        Create a new `presigned_url` audit log.
221
        """
222
        result = await self.db_session.execute(
1✔
223
            text("SELECT nextval('global_presigned_url_id_seq')")
224
        )
225
        data["id"] = result.scalar()
1✔
226
        self.db_session.add(PresignedUrl(**data))
1✔
227

228
    async def create_login_log(self, data: Dict[str, Any]) -> None:
1✔
229
        """
230
        Create a new `login` audit log.
231
        """
232
        result = await self.db_session.execute(
1✔
233
            text("SELECT nextval('global_login_id_seq')")
234
        )
235
        data["id"] = result.scalar()
1✔
236
        self.db_session.add(Login(**data))
1✔
237

238

239
async def get_data_access_layer() -> AsyncGenerator[DataAccessLayer, Any]:
1✔
240
    """
241
    Create an AsyncSession and yield an instance of the Data Access Layer,
242
    which acts as an abstract interface to manipulate the database.
243

244
    Can be injected as a dependency in FastAPI endpoints.
245
    """
246
    async with async_sessionmaker_instance() as session:
1✔
247
        async with session.begin():
1✔
248
            yield DataAccessLayer(session)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc