• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 6523d139-4c8d-4daf-a514-97baaa202bd9

04 Jun 2025 04:30PM UTC coverage: 86.762% (-0.006%) from 86.768%
6523d139-4c8d-4daf-a514-97baaa202bd9

push

circleci

web-flow
test(esm/sqs): Skip flaky test_report_batch_item_failures test (#12713)

65076 of 75005 relevant lines covered (86.76%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.7
/localstack-core/localstack/services/dynamodbstreams/provider.py
1
import copy
1✔
2
import logging
1✔
3

4
from bson.json_util import loads
1✔
5

6
from localstack.aws.api import RequestContext, handler
1✔
7
from localstack.aws.api.dynamodbstreams import (
1✔
8
    DescribeStreamOutput,
9
    DynamodbstreamsApi,
10
    ExpiredIteratorException,
11
    GetRecordsInput,
12
    GetRecordsOutput,
13
    GetShardIteratorOutput,
14
    ListStreamsOutput,
15
    PositiveIntegerObject,
16
    ResourceNotFoundException,
17
    SequenceNumber,
18
    ShardId,
19
    ShardIteratorType,
20
    Stream,
21
    StreamArn,
22
    StreamDescription,
23
    StreamStatus,
24
    TableName,
25
)
26
from localstack.aws.connect import connect_to
1✔
27
from localstack.services.dynamodb.utils import change_region_in_ddb_stream_arn
1✔
28
from localstack.services.dynamodbstreams.dynamodbstreams_api import (
1✔
29
    get_dynamodbstreams_store,
30
    get_kinesis_client,
31
    get_kinesis_stream_name,
32
    get_original_region,
33
    get_shard_id,
34
    kinesis_shard_id,
35
    stream_name_from_stream_arn,
36
    table_name_from_stream_arn,
37
)
38
from localstack.services.plugins import ServiceLifecycleHook
1✔
39
from localstack.utils.collections import select_from_typed_dict
1✔
40

41
LOG = logging.getLogger(__name__)
1✔
42

43
STREAM_STATUS_MAP = {
1✔
44
    "ACTIVE": StreamStatus.ENABLED,
45
    "CREATING": StreamStatus.ENABLING,
46
    "DELETING": StreamStatus.DISABLING,
47
    "UPDATING": StreamStatus.ENABLING,
48
}
49

50

51
class DynamoDBStreamsProvider(DynamodbstreamsApi, ServiceLifecycleHook):
1✔
52
    shard_to_region: dict[str, str]
1✔
53
    """Map a shard iterator to the originating region. This is used in case of replica tables, as LocalStack keeps the
1✔
54
    data in one region only, redirecting all the requests from replica regions."""
55

56
    def __init__(self):
1✔
57
        self.shard_to_region = {}
1✔
58

59
    def describe_stream(
1✔
60
        self,
61
        context: RequestContext,
62
        stream_arn: StreamArn,
63
        limit: PositiveIntegerObject = None,
64
        exclusive_start_shard_id: ShardId = None,
65
        **kwargs,
66
    ) -> DescribeStreamOutput:
67
        og_region = get_original_region(context=context, stream_arn=stream_arn)
1✔
68
        store = get_dynamodbstreams_store(context.account_id, og_region)
1✔
69
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=og_region)
1✔
70
        for stream in store.ddb_streams.values():
1✔
71
            _stream_arn = stream_arn
1✔
72
            if context.region != og_region:
1✔
73
                _stream_arn = change_region_in_ddb_stream_arn(_stream_arn, og_region)
1✔
74
            if stream["StreamArn"] == _stream_arn:
1✔
75
                # get stream details
76
                dynamodb = connect_to(
1✔
77
                    aws_access_key_id=context.account_id, region_name=og_region
78
                ).dynamodb
79
                table_name = table_name_from_stream_arn(stream["StreamArn"])
1✔
80
                stream_name = get_kinesis_stream_name(table_name)
1✔
81
                stream_details = kinesis.describe_stream(StreamName=stream_name)
1✔
82
                table_details = dynamodb.describe_table(TableName=table_name)
1✔
83
                stream["KeySchema"] = table_details["Table"]["KeySchema"]
1✔
84
                stream["StreamStatus"] = STREAM_STATUS_MAP.get(
1✔
85
                    stream_details["StreamDescription"]["StreamStatus"]
86
                )
87

88
                # Replace Kinesis ShardIDs with ones that mimic actual
89
                # DynamoDBStream ShardIDs.
90
                stream_shards = copy.deepcopy(stream_details["StreamDescription"]["Shards"])
1✔
91
                start_index = 0
1✔
92
                for index, shard in enumerate(stream_shards):
1✔
93
                    shard["ShardId"] = get_shard_id(stream, shard["ShardId"])
1✔
94
                    shard.pop("HashKeyRange", None)
1✔
95
                    # we want to ignore the shards before exclusive_start_shard_id parameters
96
                    # we store the index where we encounter then slice the shards
97
                    if exclusive_start_shard_id and exclusive_start_shard_id == shard["ShardId"]:
1✔
98
                        start_index = index
1✔
99

100
                if exclusive_start_shard_id:
1✔
101
                    # slicing the resulting shards after the exclusive_start_shard_id parameters
102
                    stream_shards = stream_shards[start_index + 1 :]
1✔
103

104
                stream["Shards"] = stream_shards
1✔
105
                stream_description = select_from_typed_dict(StreamDescription, stream)
1✔
106
                stream_description["StreamArn"] = _stream_arn
1✔
107
                return DescribeStreamOutput(StreamDescription=stream_description)
1✔
108

109
        raise ResourceNotFoundException(
1✔
110
            f"Requested resource not found: Stream: {stream_arn} not found"
111
        )
112

113
    @handler("GetRecords", expand=False)
1✔
114
    def get_records(self, context: RequestContext, payload: GetRecordsInput) -> GetRecordsOutput:
1✔
115
        _shard_iterator = payload["ShardIterator"]
1✔
116
        region_name = context.region
1✔
117
        if payload["ShardIterator"] in self.shard_to_region:
1✔
118
            region_name = self.shard_to_region[_shard_iterator]
1✔
119

120
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=region_name)
1✔
121
        prefix, _, payload["ShardIterator"] = _shard_iterator.rpartition("|")
1✔
122
        try:
1✔
123
            kinesis_records = kinesis.get_records(**payload)
1✔
124
        except kinesis.exceptions.ExpiredIteratorException:
1✔
125
            self.shard_to_region.pop(_shard_iterator, None)
×
126
            LOG.debug("Shard iterator for underlying kinesis stream expired")
×
127
            raise ExpiredIteratorException("Shard iterator has expired")
×
128
        result = {
1✔
129
            "Records": [],
130
            "NextShardIterator": f"{prefix}|{kinesis_records.get('NextShardIterator')}",
131
        }
132
        for record in kinesis_records["Records"]:
1✔
133
            record_data = loads(record["Data"])
1✔
134
            record_data["dynamodb"]["SequenceNumber"] = record["SequenceNumber"]
1✔
135
            result["Records"].append(record_data)
1✔
136

137
        # Similar as the logic in GetShardIterator, we need to track the originating region when we get the
138
        # NextShardIterator in the results.
139
        if region_name != context.region and "NextShardIterator" in result:
1✔
140
            self.shard_to_region[result["NextShardIterator"]] = region_name
1✔
141
        return GetRecordsOutput(**result)
1✔
142

143
    def get_shard_iterator(
1✔
144
        self,
145
        context: RequestContext,
146
        stream_arn: StreamArn,
147
        shard_id: ShardId,
148
        shard_iterator_type: ShardIteratorType,
149
        sequence_number: SequenceNumber = None,
150
        **kwargs,
151
    ) -> GetShardIteratorOutput:
152
        stream_name = stream_name_from_stream_arn(stream_arn)
1✔
153
        og_region = get_original_region(context=context, stream_arn=stream_arn)
1✔
154
        stream_shard_id = kinesis_shard_id(shard_id)
1✔
155
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=og_region)
1✔
156

157
        kwargs = {"StartingSequenceNumber": sequence_number} if sequence_number else {}
1✔
158
        result = kinesis.get_shard_iterator(
1✔
159
            StreamName=stream_name,
160
            ShardId=stream_shard_id,
161
            ShardIteratorType=shard_iterator_type,
162
            **kwargs,
163
        )
164
        del result["ResponseMetadata"]
1✔
165
        # TODO not quite clear what the |1| exactly denotes, because at AWS it's sometimes other numbers
166
        result["ShardIterator"] = f"{stream_arn}|1|{result['ShardIterator']}"
1✔
167

168
        # In case of a replica table, we need to keep track of the real region originating the shard iterator.
169
        # This region will be later used in GetRecords to redirect to the originating region, holding the data.
170
        if og_region != context.region:
1✔
171
            self.shard_to_region[result["ShardIterator"]] = og_region
1✔
172
        return GetShardIteratorOutput(**result)
1✔
173

174
    def list_streams(
1✔
175
        self,
176
        context: RequestContext,
177
        table_name: TableName = None,
178
        limit: PositiveIntegerObject = None,
179
        exclusive_start_stream_arn: StreamArn = None,
180
        **kwargs,
181
    ) -> ListStreamsOutput:
182
        og_region = get_original_region(context=context, table_name=table_name)
1✔
183
        store = get_dynamodbstreams_store(context.account_id, og_region)
1✔
184
        result = [select_from_typed_dict(Stream, res) for res in store.ddb_streams.values()]
1✔
185
        if table_name:
1✔
186
            result: list[Stream] = [res for res in result if res["TableName"] == table_name]
1✔
187
            # If this is a stream from a table replica, we need to change the region in the stream ARN, as LocalStack
188
            # keeps a stream only in the originating region.
189
            if context.region != og_region:
1✔
190
                for stream in result:
1✔
191
                    stream["StreamArn"] = change_region_in_ddb_stream_arn(
1✔
192
                        stream["StreamArn"], context.region
193
                    )
194

195
        return ListStreamsOutput(Streams=result)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc