• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19844934392

01 Dec 2025 07:55PM UTC coverage: 86.945% (+0.1%) from 86.821%
19844934392

push

github

web-flow
Update ASF APIs, provider signatures, disable lambda patches (#13444)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>
Co-authored-by: Silvio Vasiljevic <silvio.vasiljevic@gmail.com>

69707 of 80174 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/localstack-core/localstack/services/dynamodbstreams/provider.py
1
import copy
1✔
2
import logging
1✔
3

4
from bson.json_util import loads
1✔
5

6
from localstack.aws.api import RequestContext, handler
1✔
7
from localstack.aws.api.dynamodbstreams import (
1✔
8
    DescribeStreamOutput,
9
    DynamodbstreamsApi,
10
    ExpiredIteratorException,
11
    GetRecordsInput,
12
    GetRecordsOutput,
13
    GetShardIteratorOutput,
14
    ListStreamsOutput,
15
    PositiveIntegerObject,
16
    ResourceNotFoundException,
17
    SequenceNumber,
18
    ShardFilter,
19
    ShardId,
20
    ShardIteratorType,
21
    Stream,
22
    StreamArn,
23
    StreamDescription,
24
    StreamStatus,
25
    TableName,
26
)
27
from localstack.aws.connect import connect_to
1✔
28
from localstack.services.dynamodb.utils import change_region_in_ddb_stream_arn
1✔
29
from localstack.services.dynamodbstreams.dynamodbstreams_api import (
1✔
30
    get_dynamodbstreams_store,
31
    get_kinesis_client,
32
    get_kinesis_stream_name,
33
    get_original_region,
34
    get_shard_id,
35
    kinesis_shard_id,
36
    stream_name_from_stream_arn,
37
    table_name_from_stream_arn,
38
)
39
from localstack.services.plugins import ServiceLifecycleHook
1✔
40
from localstack.state import StateVisitor
1✔
41
from localstack.utils.collections import select_from_typed_dict
1✔
42

43
LOG = logging.getLogger(__name__)
1✔
44

45
STREAM_STATUS_MAP = {
1✔
46
    "ACTIVE": StreamStatus.ENABLED,
47
    "CREATING": StreamStatus.ENABLING,
48
    "DELETING": StreamStatus.DISABLING,
49
    "UPDATING": StreamStatus.ENABLING,
50
}
51

52

53
class DynamoDBStreamsProvider(DynamodbstreamsApi, ServiceLifecycleHook):
1✔
54
    shard_to_region: dict[str, str]
1✔
55
    """Map a shard iterator to the originating region. This is used in case of replica tables, as LocalStack keeps the
1✔
56
    data in one region only, redirecting all the requests from replica regions."""
57

58
    def __init__(self):
1✔
59
        self.shard_to_region = {}
1✔
60

61
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
62
        from localstack.services.dynamodbstreams.models import dynamodbstreams_stores
×
63

64
        visitor.visit(dynamodbstreams_stores)
×
65

66
    def describe_stream(
1✔
67
        self,
68
        context: RequestContext,
69
        stream_arn: StreamArn,
70
        limit: PositiveIntegerObject | None = None,
71
        exclusive_start_shard_id: ShardId | None = None,
72
        shard_filter: ShardFilter | None = None,
73
        **kwargs,
74
    ) -> DescribeStreamOutput:
75
        # TODO add support for shard_filter
76
        og_region = get_original_region(context=context, stream_arn=stream_arn)
1✔
77
        store = get_dynamodbstreams_store(context.account_id, og_region)
1✔
78
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=og_region)
1✔
79
        for stream in store.ddb_streams.values():
1✔
80
            _stream_arn = stream_arn
1✔
81
            if context.region != og_region:
1✔
82
                _stream_arn = change_region_in_ddb_stream_arn(_stream_arn, og_region)
1✔
83
            if stream["StreamArn"] == _stream_arn:
1✔
84
                # get stream details
85
                dynamodb = connect_to(
1✔
86
                    aws_access_key_id=context.account_id, region_name=og_region
87
                ).dynamodb
88
                table_name = table_name_from_stream_arn(stream["StreamArn"])
1✔
89
                stream_name = get_kinesis_stream_name(table_name)
1✔
90
                stream_details = kinesis.describe_stream(StreamName=stream_name)
1✔
91
                table_details = dynamodb.describe_table(TableName=table_name)
1✔
92
                stream["KeySchema"] = table_details["Table"]["KeySchema"]
1✔
93
                stream["StreamStatus"] = STREAM_STATUS_MAP.get(
1✔
94
                    stream_details["StreamDescription"]["StreamStatus"]
95
                )
96

97
                # Replace Kinesis ShardIDs with ones that mimic actual
98
                # DynamoDBStream ShardIDs.
99
                stream_shards = copy.deepcopy(stream_details["StreamDescription"]["Shards"])
1✔
100
                start_index = 0
1✔
101
                for index, shard in enumerate(stream_shards):
1✔
102
                    shard["ShardId"] = get_shard_id(stream, shard["ShardId"])
1✔
103
                    shard.pop("HashKeyRange", None)
1✔
104
                    # we want to ignore the shards before exclusive_start_shard_id parameters
105
                    # we store the index where we encounter then slice the shards
106
                    if exclusive_start_shard_id and exclusive_start_shard_id == shard["ShardId"]:
1✔
107
                        start_index = index
1✔
108

109
                if exclusive_start_shard_id:
1✔
110
                    # slicing the resulting shards after the exclusive_start_shard_id parameters
111
                    stream_shards = stream_shards[start_index + 1 :]
1✔
112

113
                stream["Shards"] = stream_shards
1✔
114
                stream_description = select_from_typed_dict(StreamDescription, stream)
1✔
115
                stream_description["StreamArn"] = _stream_arn
1✔
116
                return DescribeStreamOutput(StreamDescription=stream_description)
1✔
117

118
        raise ResourceNotFoundException(
1✔
119
            f"Requested resource not found: Stream: {stream_arn} not found"
120
        )
121

122
    @handler("GetRecords", expand=False)
1✔
123
    def get_records(self, context: RequestContext, payload: GetRecordsInput) -> GetRecordsOutput:
1✔
124
        _shard_iterator = payload["ShardIterator"]
1✔
125
        region_name = context.region
1✔
126
        if payload["ShardIterator"] in self.shard_to_region:
1✔
127
            region_name = self.shard_to_region[_shard_iterator]
1✔
128

129
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=region_name)
1✔
130
        prefix, _, payload["ShardIterator"] = _shard_iterator.rpartition("|")
1✔
131
        try:
1✔
132
            kinesis_records = kinesis.get_records(**payload)
1✔
133
        except kinesis.exceptions.ExpiredIteratorException:
1✔
134
            self.shard_to_region.pop(_shard_iterator, None)
×
135
            LOG.debug("Shard iterator for underlying kinesis stream expired")
×
136
            raise ExpiredIteratorException("Shard iterator has expired")
×
137
        result = {
1✔
138
            "Records": [],
139
            "NextShardIterator": f"{prefix}|{kinesis_records.get('NextShardIterator')}",
140
        }
141
        for record in kinesis_records["Records"]:
1✔
142
            record_data = loads(record["Data"])
1✔
143
            record_data["dynamodb"]["SequenceNumber"] = record["SequenceNumber"]
1✔
144
            result["Records"].append(record_data)
1✔
145

146
        # Similar as the logic in GetShardIterator, we need to track the originating region when we get the
147
        # NextShardIterator in the results.
148
        if region_name != context.region and "NextShardIterator" in result:
1✔
149
            self.shard_to_region[result["NextShardIterator"]] = region_name
1✔
150
        return GetRecordsOutput(**result)
1✔
151

152
    def get_shard_iterator(
1✔
153
        self,
154
        context: RequestContext,
155
        stream_arn: StreamArn,
156
        shard_id: ShardId,
157
        shard_iterator_type: ShardIteratorType,
158
        sequence_number: SequenceNumber = None,
159
        **kwargs,
160
    ) -> GetShardIteratorOutput:
161
        stream_name = stream_name_from_stream_arn(stream_arn)
1✔
162
        og_region = get_original_region(context=context, stream_arn=stream_arn)
1✔
163
        stream_shard_id = kinesis_shard_id(shard_id)
1✔
164
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=og_region)
1✔
165

166
        kwargs = {"StartingSequenceNumber": sequence_number} if sequence_number else {}
1✔
167
        result = kinesis.get_shard_iterator(
1✔
168
            StreamName=stream_name,
169
            ShardId=stream_shard_id,
170
            ShardIteratorType=shard_iterator_type,
171
            **kwargs,
172
        )
173
        del result["ResponseMetadata"]
1✔
174
        # TODO not quite clear what the |1| exactly denotes, because at AWS it's sometimes other numbers
175
        result["ShardIterator"] = f"{stream_arn}|1|{result['ShardIterator']}"
1✔
176

177
        # In case of a replica table, we need to keep track of the real region originating the shard iterator.
178
        # This region will be later used in GetRecords to redirect to the originating region, holding the data.
179
        if og_region != context.region:
1✔
180
            self.shard_to_region[result["ShardIterator"]] = og_region
1✔
181
        return GetShardIteratorOutput(**result)
1✔
182

183
    def list_streams(
1✔
184
        self,
185
        context: RequestContext,
186
        table_name: TableName = None,
187
        limit: PositiveIntegerObject = None,
188
        exclusive_start_stream_arn: StreamArn = None,
189
        **kwargs,
190
    ) -> ListStreamsOutput:
191
        og_region = get_original_region(context=context, table_name=table_name)
1✔
192
        store = get_dynamodbstreams_store(context.account_id, og_region)
1✔
193
        result = [select_from_typed_dict(Stream, res) for res in store.ddb_streams.values()]
1✔
194
        if table_name:
1✔
195
            result: list[Stream] = [res for res in result if res["TableName"] == table_name]
1✔
196
            # If this is a stream from a table replica, we need to change the region in the stream ARN, as LocalStack
197
            # keeps a stream only in the originating region.
198
            if context.region != og_region:
1✔
199
                for stream in result:
1✔
200
                    stream["StreamArn"] = change_region_in_ddb_stream_arn(
1✔
201
                        stream["StreamArn"], context.region
202
                    )
203

204
        return ListStreamsOutput(Streams=result)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc