• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / audit-service / 8668427090

04 Mar 2024 10:55PM UTC coverage: 77.309% (-11.8%) from 89.157%
8668427090

push

github

web-flow
PPS-747 Update fastapi (#49)

1 of 1 new or added line in 1 file covered. (100.0%)

59 existing lines in 4 files now uncovered.

385 of 498 relevant lines covered (77.31%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.64
/src/audit/pull_from_queue.py
1
import asyncio
1✔
2
import boto3
1✔
3
import json
1✔
4
import traceback
1✔
5

6
from . import logger
1✔
7
from .config import config
1✔
8
from .models import CATEGORY_TO_MODEL_CLASS
1✔
9
from .routes.maintain import insert_row, validate_presigned_url_log, validate_login_log
1✔
10

11

12
async def process_log(data, timestamp):
1✔
13
    # check log category
UNCOV
14
    category = data.pop("category")
×
UNCOV
15
    assert (
×
16
        category and category in CATEGORY_TO_MODEL_CLASS
17
    ), f"Unknown log category {category}"
18

19
    # if the timestamp was not provided, default to the message's SentTimestamp
UNCOV
20
    if not data.get("timestamp"):
×
UNCOV
21
        data["timestamp"] = timestamp
×
22

23
    # validate log
UNCOV
24
    if category == "presigned_url":
×
UNCOV
25
        validate_presigned_url_log(data)
×
26
    elif category == "login":
×
27
        validate_login_log(data)
×
28

29
    # insert log in DB
UNCOV
30
    await insert_row(category, data)
×
31

32

33
async def pull_from_queue(sqs):
1✔
UNCOV
34
    failed = False
×
UNCOV
35
    messages = []
×
UNCOV
36
    try:
×
UNCOV
37
        response = sqs.receive_message(
×
38
            QueueUrl=config["QUEUE_CONFIG"]["aws_sqs_config"]["sqs_url"],
39
            MaxNumberOfMessages=10,  # 10 is the max allowed by AWS
40
            AttributeNames=["SentTimestamp"],
41
        )
UNCOV
42
        messages = response.get("Messages", [])
×
UNCOV
43
    except Exception as e:
×
UNCOV
44
        failed = True
×
UNCOV
45
        logger.error(f"Error pulling from queue: {e}")
×
UNCOV
46
        traceback.print_exc()
×
47

UNCOV
48
    for message in messages:
×
UNCOV
49
        data = json.loads(message["Body"])
×
UNCOV
50
        receipt_handle = message["ReceiptHandle"]
×
51
        # when the message was sent to the queue
UNCOV
52
        sent_timestamp = message["Attributes"]["SentTimestamp"]
×
UNCOV
53
        timestamp = int(int(sent_timestamp) / 1000)  # ms to s
×
UNCOV
54
        try:
×
UNCOV
55
            await process_log(data, timestamp)
×
UNCOV
56
        except Exception as e:
×
UNCOV
57
            failed = True
×
UNCOV
58
            logger.error(f"Error processing audit log: {e}")
×
UNCOV
59
            traceback.print_exc()
×
60
        else:
61
            # delete message from queue once successfully processed
UNCOV
62
            try:
×
UNCOV
63
                sqs.delete_message(
×
64
                    QueueUrl=config["QUEUE_CONFIG"]["aws_sqs_config"]["sqs_url"],
65
                    ReceiptHandle=receipt_handle,
66
                )
67
            except Exception as e:
×
68
                failed = True
×
69
                logger.error(f"Error deleting message from queue: {e}")
×
70
                traceback.print_exc()
×
71

72
    # if the queue is empty, or we failed to process a message: sleep
UNCOV
73
    should_sleep = not messages or failed
×
UNCOV
74
    return should_sleep
×
75

76

77
async def pull_from_queue_loop():
1✔
78
    """
79
    Note that `pull_from_queue_loop` and `pull_from_queue` only handle
80
    AWS SQS right now.
81
    """
UNCOV
82
    logger.info("Starting to pull from queue...")
×
UNCOV
83
    aws_sqs_config = config["QUEUE_CONFIG"]["aws_sqs_config"]
×
84
    # we know the cred is in AWS_CREDENTIALS (see `AuditServiceConfig.validate`)
UNCOV
85
    aws_creds = (
×
86
        config["AWS_CREDENTIALS"][aws_sqs_config["aws_cred"]]
87
        if "aws_cred" in aws_sqs_config and aws_sqs_config["aws_cred"]
88
        else {}
89
    )
UNCOV
90
    if (
×
91
        not aws_creds
92
        and "aws_access_key_id" in aws_sqs_config
93
        and "aws_secret_access_key" in aws_sqs_config
94
    ):
95
        # for backwards compatibility
96
        aws_creds = {
×
97
            "aws_access_key_id": aws_sqs_config["aws_access_key_id"],
98
            "aws_secret_access_key": aws_sqs_config["aws_secret_access_key"],
99
        }
UNCOV
100
    sqs = boto3.client(
×
101
        "sqs",
102
        region_name=aws_sqs_config["region"],
103
        aws_access_key_id=aws_creds.get("aws_access_key_id"),
104
        aws_secret_access_key=aws_creds.get("aws_secret_access_key"),
105
    )
UNCOV
106
    sleep_time = config["PULL_FREQUENCY_SECONDS"]
×
107
    while True:
UNCOV
108
        should_sleep = await pull_from_queue(sqs)
×
UNCOV
109
        if should_sleep:
×
UNCOV
110
            logger.info(f"Sleeping for {sleep_time} seconds...")
×
UNCOV
111
            await asyncio.sleep(sleep_time)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc