• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 99d68267-703b-430f-8888-adccf51e571d

18 Apr 2025 08:38AM UTC coverage: 86.269% (-0.01%) from 86.279%
99d68267-703b-430f-8888-adccf51e571d

push

circleci

web-flow
apply fix for podman container labels dict (#12526)

Co-authored-by: Tjeerd Ritsma <tjeerd@playgroundtech.io>

2 of 3 new or added lines in 1 file covered. (66.67%)

28 existing lines in 12 files now uncovered.

63883 of 74051 relevant lines covered (86.27%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.01
/localstack-core/localstack/services/dynamodbstreams/provider.py
1
import copy
1✔
2
import logging
1✔
3

4
from bson.json_util import loads
1✔
5

6
from localstack.aws.api import RequestContext, handler
1✔
7
from localstack.aws.api.dynamodbstreams import (
1✔
8
    DescribeStreamOutput,
9
    DynamodbstreamsApi,
10
    ExpiredIteratorException,
11
    GetRecordsInput,
12
    GetRecordsOutput,
13
    GetShardIteratorOutput,
14
    ListStreamsOutput,
15
    PositiveIntegerObject,
16
    ResourceNotFoundException,
17
    SequenceNumber,
18
    ShardId,
19
    ShardIteratorType,
20
    Stream,
21
    StreamArn,
22
    StreamDescription,
23
    StreamStatus,
24
    TableName,
25
)
26
from localstack.aws.connect import connect_to
1✔
27
from localstack.services.dynamodbstreams.dynamodbstreams_api import (
1✔
28
    get_dynamodbstreams_store,
29
    get_kinesis_client,
30
    get_kinesis_stream_name,
31
    get_shard_id,
32
    kinesis_shard_id,
33
    stream_name_from_stream_arn,
34
    table_name_from_stream_arn,
35
)
36
from localstack.services.plugins import ServiceLifecycleHook
1✔
37
from localstack.utils.collections import select_from_typed_dict
1✔
38

39
LOG = logging.getLogger(__name__)
1✔
40

41
STREAM_STATUS_MAP = {
1✔
42
    "ACTIVE": StreamStatus.ENABLED,
43
    "CREATING": StreamStatus.ENABLING,
44
    "DELETING": StreamStatus.DISABLING,
45
    "UPDATING": StreamStatus.ENABLING,
46
}
47

48

49
class DynamoDBStreamsProvider(DynamodbstreamsApi, ServiceLifecycleHook):
1✔
50
    def describe_stream(
1✔
51
        self,
52
        context: RequestContext,
53
        stream_arn: StreamArn,
54
        limit: PositiveIntegerObject = None,
55
        exclusive_start_shard_id: ShardId = None,
56
        **kwargs,
57
    ) -> DescribeStreamOutput:
58
        store = get_dynamodbstreams_store(context.account_id, context.region)
1✔
59
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=context.region)
1✔
60
        for stream in store.ddb_streams.values():
1✔
61
            if stream["StreamArn"] == stream_arn:
1✔
62
                # get stream details
63
                dynamodb = connect_to(
1✔
64
                    aws_access_key_id=context.account_id, region_name=context.region
65
                ).dynamodb
66
                table_name = table_name_from_stream_arn(stream["StreamArn"])
1✔
67
                stream_name = get_kinesis_stream_name(table_name)
1✔
68
                stream_details = kinesis.describe_stream(StreamName=stream_name)
1✔
69
                table_details = dynamodb.describe_table(TableName=table_name)
1✔
70
                stream["KeySchema"] = table_details["Table"]["KeySchema"]
1✔
71
                stream["StreamStatus"] = STREAM_STATUS_MAP.get(
1✔
72
                    stream_details["StreamDescription"]["StreamStatus"]
73
                )
74

75
                # Replace Kinesis ShardIDs with ones that mimic actual
76
                # DynamoDBStream ShardIDs.
77
                stream_shards = copy.deepcopy(stream_details["StreamDescription"]["Shards"])
1✔
78
                start_index = 0
1✔
79
                for index, shard in enumerate(stream_shards):
1✔
80
                    shard["ShardId"] = get_shard_id(stream, shard["ShardId"])
1✔
81
                    shard.pop("HashKeyRange", None)
1✔
82
                    # we want to ignore the shards before exclusive_start_shard_id parameters
83
                    # we store the index where we encounter then slice the shards
84
                    if exclusive_start_shard_id and exclusive_start_shard_id == shard["ShardId"]:
1✔
85
                        start_index = index
1✔
86

87
                if exclusive_start_shard_id:
1✔
88
                    # slicing the resulting shards after the exclusive_start_shard_id parameters
89
                    stream_shards = stream_shards[start_index + 1 :]
1✔
90

91
                stream["Shards"] = stream_shards
1✔
92
                stream_description = select_from_typed_dict(StreamDescription, stream)
1✔
93
                return DescribeStreamOutput(StreamDescription=stream_description)
1✔
94

95
        raise ResourceNotFoundException(
1✔
96
            f"Requested resource not found: Stream: {stream_arn} not found"
97
        )
98

99
    @handler("GetRecords", expand=False)
1✔
100
    def get_records(self, context: RequestContext, payload: GetRecordsInput) -> GetRecordsOutput:
1✔
101
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=context.region)
1✔
102
        prefix, _, payload["ShardIterator"] = payload["ShardIterator"].rpartition("|")
1✔
103
        try:
1✔
104
            kinesis_records = kinesis.get_records(**payload)
1✔
105
        except kinesis.exceptions.ExpiredIteratorException:
1✔
UNCOV
106
            LOG.debug("Shard iterator for underlying kinesis stream expired")
×
UNCOV
107
            raise ExpiredIteratorException("Shard iterator has expired")
×
108
        result = {
1✔
109
            "Records": [],
110
            "NextShardIterator": f"{prefix}|{kinesis_records.get('NextShardIterator')}",
111
        }
112
        for record in kinesis_records["Records"]:
1✔
113
            record_data = loads(record["Data"])
1✔
114
            record_data["dynamodb"]["SequenceNumber"] = record["SequenceNumber"]
1✔
115
            result["Records"].append(record_data)
1✔
116
        return GetRecordsOutput(**result)
1✔
117

118
    def get_shard_iterator(
1✔
119
        self,
120
        context: RequestContext,
121
        stream_arn: StreamArn,
122
        shard_id: ShardId,
123
        shard_iterator_type: ShardIteratorType,
124
        sequence_number: SequenceNumber = None,
125
        **kwargs,
126
    ) -> GetShardIteratorOutput:
127
        stream_name = stream_name_from_stream_arn(stream_arn)
1✔
128
        stream_shard_id = kinesis_shard_id(shard_id)
1✔
129
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=context.region)
1✔
130

131
        kwargs = {"StartingSequenceNumber": sequence_number} if sequence_number else {}
1✔
132
        result = kinesis.get_shard_iterator(
1✔
133
            StreamName=stream_name,
134
            ShardId=stream_shard_id,
135
            ShardIteratorType=shard_iterator_type,
136
            **kwargs,
137
        )
138
        del result["ResponseMetadata"]
1✔
139
        # TODO not quite clear what the |1| exactly denotes, because at AWS it's sometimes other numbers
140
        result["ShardIterator"] = f"{stream_arn}|1|{result['ShardIterator']}"
1✔
141
        return GetShardIteratorOutput(**result)
1✔
142

143
    def list_streams(
1✔
144
        self,
145
        context: RequestContext,
146
        table_name: TableName = None,
147
        limit: PositiveIntegerObject = None,
148
        exclusive_start_stream_arn: StreamArn = None,
149
        **kwargs,
150
    ) -> ListStreamsOutput:
151
        store = get_dynamodbstreams_store(context.account_id, context.region)
1✔
152
        result = [select_from_typed_dict(Stream, res) for res in store.ddb_streams.values()]
1✔
153
        if table_name:
1✔
154
            result = [res for res in result if res["TableName"] == table_name]
1✔
155
        return ListStreamsOutput(Streams=result)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc