• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / audit-service / 15498362459

06 Jun 2025 07:36PM UTC coverage: 79.016% (+1.7%) from 77.309%
15498362459

push

github

web-flow
Migrate Audit Service from GINO to Async SQLAlchemy (#80)


---------

Co-authored-by: Alexander VanTol <avantol@uchicago.edu>
Co-authored-by: Albert Snow <ajsnow2012@gmail.com>
Co-authored-by: MaribelleHGomez <maribellehgomez@gmail.com>
Co-authored-by: Ao Liu (frankliuao) <frankliuao@gmail.com>
Co-authored-by: nss10 <nss10@users.noreply.github.com>

228 of 282 new or added lines in 10 files covered. (80.85%)

7 existing lines in 3 files now uncovered.

482 of 610 relevant lines covered (79.02%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.13
/src/audit/pull_from_queue.py
1
import asyncio
1✔
2
import boto3
1✔
3
import json
1✔
4
import traceback
1✔
5

6
from . import logger
1✔
7
from .config import config
1✔
8
from .models import CATEGORY_TO_MODEL_CLASS
1✔
9
from .utils.validate_utils import validate_presigned_url_log, validate_login_log
1✔
10
from .db import get_data_access_layer
1✔
11

12

13
async def process_log(
1✔
14
    data,
15
    timestamp,
16
):
17
    # check log category
18
    category = data.pop("category")
1✔
19
    assert (
1✔
20
        category and category in CATEGORY_TO_MODEL_CLASS
21
    ), f"Unknown log category {category}"
22

23
    # if the timestamp was not provided, default to the message's SentTimestamp
24
    if not data.get("timestamp"):
1✔
25
        data["timestamp"] = timestamp
1✔
26

27
    async for data_access_layer in get_data_access_layer():
1✔
28
        # validate log
29
        if category == "presigned_url":
1✔
30
            validate_presigned_url_log(data)
1✔
31
            await data_access_layer.create_presigned_url_log(data)
1✔
NEW
32
        elif category == "login":
×
NEW
33
            validate_login_log(data)
×
34
            await data_access_layer.create_login_log(data)
1✔
35

36

37
async def pull_from_queue(sqs):
1✔
38
    failed = False
1✔
39
    messages = []
1✔
40
    try:
1✔
41
        response = sqs.receive_message(
1✔
42
            QueueUrl=config["QUEUE_CONFIG"]["aws_sqs_config"]["sqs_url"],
43
            MaxNumberOfMessages=10,  # 10 is the max allowed by AWS
44
            AttributeNames=["SentTimestamp"],
45
        )
46
        messages = response.get("Messages", [])
1✔
47
    except Exception as e:
×
48
        failed = True
×
49
        logger.error(f"Error pulling from queue: {e}")
×
50
        traceback.print_exc()
×
51

52
    for message in messages:
1✔
53
        data = json.loads(message["Body"])
1✔
54
        receipt_handle = message["ReceiptHandle"]
1✔
55
        # when the message was sent to the queue
56
        sent_timestamp = message["Attributes"]["SentTimestamp"]
1✔
57
        timestamp = int(int(sent_timestamp) / 1000)  # ms to s
1✔
58
        try:
1✔
59
            await process_log(data, timestamp)
1✔
60
        except Exception as e:
1✔
61
            failed = True
1✔
62
            logger.error(f"Error processing audit log: {e}")
1✔
63
            traceback.print_exc()
1✔
64
        else:
65
            # delete message from queue once successfully processed
66
            try:
1✔
67
                sqs.delete_message(
1✔
68
                    QueueUrl=config["QUEUE_CONFIG"]["aws_sqs_config"]["sqs_url"],
69
                    ReceiptHandle=receipt_handle,
70
                )
71
            except Exception as e:
×
72
                failed = True
×
73
                logger.error(f"Error deleting message from queue: {e}")
×
74
                traceback.print_exc()
×
75

76
    # if the queue is empty, or we failed to process a message: sleep
77
    should_sleep = not messages or failed
1✔
78
    return should_sleep
1✔
79

80

81
async def pull_from_queue_loop():
1✔
82
    """
83
    Note that `pull_from_queue_loop` and `pull_from_queue` only handle
84
    AWS SQS right now.
85
    """
86
    logger.info("Starting to pull from queue...")
×
87
    aws_sqs_config = config["QUEUE_CONFIG"]["aws_sqs_config"]
×
88
    # we know the cred is in AWS_CREDENTIALS (see `AuditServiceConfig.validate`)
89
    aws_creds = (
×
90
        config["AWS_CREDENTIALS"][aws_sqs_config["aws_cred"]]
91
        if "aws_cred" in aws_sqs_config and aws_sqs_config["aws_cred"]
92
        else {}
93
    )
94
    if (
×
95
        not aws_creds
96
        and "aws_access_key_id" in aws_sqs_config
97
        and "aws_secret_access_key" in aws_sqs_config
98
    ):
99
        # for backwards compatibility
100
        aws_creds = {
×
101
            "aws_access_key_id": aws_sqs_config["aws_access_key_id"],
102
            "aws_secret_access_key": aws_sqs_config["aws_secret_access_key"],
103
        }
104
    sqs = boto3.client(
×
105
        "sqs",
106
        region_name=aws_sqs_config["region"],
107
        aws_access_key_id=aws_creds.get("aws_access_key_id"),
108
        aws_secret_access_key=aws_creds.get("aws_secret_access_key"),
109
    )
110
    sleep_time = config["PULL_FREQUENCY_SECONDS"]
×
111
    while True:
112
        should_sleep = await pull_from_queue(sqs)
×
113
        if should_sleep:
×
114
            logger.info(f"Sleeping for {sleep_time} seconds...")
×
115
            await asyncio.sleep(sleep_time)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc