• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21765279428

06 Feb 2026 08:43PM UTC coverage: 73.535% (-13.3%) from 86.871%
21765279428

Pull #13716

github

web-flow
Merge 0f5988375 into 20cc1b384
Pull Request #13716: Events: improve Store IAM Statement typing

6 of 6 new or added lines in 1 file covered. (100.0%)

9920 existing lines in 273 files now uncovered.

56187 of 76409 relevant lines covered (73.53%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.52
/localstack-core/localstack/utils/aws/queries.py
1
from localstack.aws.connect import connect_to
1✔
2
from localstack.utils.aws.arns import extract_region_from_arn, sqs_queue_url_for_arn
1✔
3
from localstack.utils.strings import to_str
1✔
4

5

6
def sqs_receive_message(queue_arn):
1✔
UNCOV
7
    region_name = extract_region_from_arn(queue_arn)
×
UNCOV
8
    client = connect_to(region_name=region_name).sqs
×
UNCOV
9
    queue_url = sqs_queue_url_for_arn(queue_arn)
×
UNCOV
10
    response = client.receive_message(QueueUrl=queue_url)
×
UNCOV
11
    return response
×
12

13

14
def kinesis_get_latest_records(
1✔
15
    stream_name: str, shard_id: str, count: int = 10, client=None
16
) -> list[dict]:
UNCOV
17
    kinesis = client or connect_to().kinesis
×
UNCOV
18
    result = []
×
UNCOV
19
    response = kinesis.get_shard_iterator(
×
20
        StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
21
    )
UNCOV
22
    shard_iterator = response["ShardIterator"]
×
UNCOV
23
    while shard_iterator:
×
UNCOV
24
        records_response = kinesis.get_records(ShardIterator=shard_iterator)
×
UNCOV
25
        records = records_response["Records"]
×
UNCOV
26
        for record in records:
×
UNCOV
27
            try:
×
UNCOV
28
                record["Data"] = to_str(record["Data"])
×
29
            except Exception:
×
30
                pass
×
UNCOV
31
        result.extend(records)
×
UNCOV
32
        shard_iterator = records_response["NextShardIterator"] if records else False
×
UNCOV
33
        while len(result) > count:
×
34
            result.pop(0)
×
UNCOV
35
    return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc