• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22048723723

13 Feb 2026 06:59PM UTC coverage: 87.006% (+0.1%) from 86.883%
22048723723

push

github

web-flow
CW Logs: Test suite for service internalization (#13692)

22 of 22 new or added lines in 1 file covered. (100.0%)

928 existing lines in 33 files now uncovered.

69716 of 80128 relevant lines covered (87.01%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/localstack-core/localstack/services/dynamodbstreams/provider.py
1
import copy
1✔
2
import logging
1✔
3

4
from bson.json_util import loads
1✔
5

6
from localstack.aws.api import RequestContext, handler
1✔
7
from localstack.aws.api.dynamodbstreams import (
1✔
8
    DescribeStreamOutput,
9
    DynamodbstreamsApi,
10
    ExpiredIteratorException,
11
    GetRecordsInput,
12
    GetRecordsOutput,
13
    GetShardIteratorOutput,
14
    ListStreamsOutput,
15
    PositiveIntegerObject,
16
    ResourceNotFoundException,
17
    SequenceNumber,
18
    ShardFilter,
19
    ShardId,
20
    ShardIteratorType,
21
    Stream,
22
    StreamArn,
23
    StreamStatus,
24
    TableName,
25
)
26
from localstack.aws.connect import connect_to
1✔
27
from localstack.services.dynamodb.utils import change_region_in_ddb_stream_arn
1✔
28
from localstack.services.dynamodbstreams.dynamodbstreams_api import (
1✔
29
    get_dynamodbstreams_store,
30
    get_kinesis_client,
31
    get_kinesis_stream_name,
32
    get_original_region,
33
    get_shard_id,
34
    kinesis_shard_id,
35
    stream_name_from_stream_arn,
36
    table_name_from_stream_arn,
37
)
38
from localstack.services.plugins import ServiceLifecycleHook
1✔
39
from localstack.state import StateVisitor
1✔
40
from localstack.utils.collections import select_from_typed_dict
1✔
41

42
LOG = logging.getLogger(__name__)
1✔
43

44
STREAM_STATUS_MAP = {
1✔
45
    "ACTIVE": StreamStatus.ENABLED,
46
    "CREATING": StreamStatus.ENABLING,
47
    "DELETING": StreamStatus.DISABLING,
48
    "UPDATING": StreamStatus.ENABLING,
49
}
50

51

52
class DynamoDBStreamsProvider(DynamodbstreamsApi, ServiceLifecycleHook):
1✔
53
    shard_to_region: dict[str, str]
1✔
54
    """Map a shard iterator to the originating region. This is used in case of replica tables, as LocalStack keeps the
1✔
55
    data in one region only, redirecting all the requests from replica regions."""
56

57
    def __init__(self):
1✔
58
        self.shard_to_region = {}
1✔
59

60
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
61
        from localstack.services.dynamodbstreams.models import dynamodbstreams_stores
×
62

UNCOV
63
        visitor.visit(dynamodbstreams_stores)
×
64

65
    def describe_stream(
1✔
66
        self,
67
        context: RequestContext,
68
        stream_arn: StreamArn,
69
        limit: PositiveIntegerObject | None = None,
70
        exclusive_start_shard_id: ShardId | None = None,
71
        shard_filter: ShardFilter | None = None,
72
        **kwargs,
73
    ) -> DescribeStreamOutput:
74
        # TODO add support for shard_filter
75
        og_region = get_original_region(context=context, stream_arn=stream_arn)
1✔
76
        store = get_dynamodbstreams_store(context.account_id, og_region)
1✔
77
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=og_region)
1✔
78
        for stream in store.ddb_streams.values():
1✔
79
            stream_description = stream.StreamDescription
1✔
80
            _stream_arn = stream_arn
1✔
81
            if context.region != og_region:
1✔
82
                _stream_arn = change_region_in_ddb_stream_arn(_stream_arn, og_region)
1✔
83
            if stream_description["StreamArn"] == _stream_arn:
1✔
84
                # get stream details
85
                dynamodb = connect_to(
1✔
86
                    aws_access_key_id=context.account_id, region_name=og_region
87
                ).dynamodb
88
                table_name = table_name_from_stream_arn(stream_description["StreamArn"])
1✔
89
                stream_name = get_kinesis_stream_name(table_name)
1✔
90
                stream_details = kinesis.describe_stream(StreamName=stream_name)
1✔
91
                table_details = dynamodb.describe_table(TableName=table_name)
1✔
92
                stream_description["KeySchema"] = table_details["Table"]["KeySchema"]
1✔
93
                stream_description["StreamStatus"] = STREAM_STATUS_MAP.get(
1✔
94
                    stream_details["StreamDescription"]["StreamStatus"]
95
                )
96

97
                # Replace Kinesis ShardIDs with ones that mimic actual
98
                # DynamoDBStream ShardIDs.
99
                stream_shards = copy.deepcopy(stream_details["StreamDescription"]["Shards"])
1✔
100
                start_index = 0
1✔
101
                for index, shard in enumerate(stream_shards):
1✔
102
                    shard["ShardId"] = get_shard_id(stream, shard["ShardId"])
1✔
103
                    shard.pop("HashKeyRange", None)
1✔
104
                    # we want to ignore the shards before exclusive_start_shard_id parameters
105
                    # we store the index where we encounter then slice the shards
106
                    if exclusive_start_shard_id and exclusive_start_shard_id == shard["ShardId"]:
1✔
107
                        start_index = index
1✔
108

109
                if exclusive_start_shard_id:
1✔
110
                    # slicing the resulting shards after the exclusive_start_shard_id parameters
111
                    stream_shards = stream_shards[start_index + 1 :]
1✔
112

113
                stream_description["Shards"] = stream_shards
1✔
114
                stream_description["StreamArn"] = _stream_arn
1✔
115
                return DescribeStreamOutput(StreamDescription=stream_description)
1✔
116

117
        raise ResourceNotFoundException(
1✔
118
            f"Requested resource not found: Stream: {stream_arn} not found"
119
        )
120

121
    @handler("GetRecords", expand=False)
1✔
122
    def get_records(self, context: RequestContext, payload: GetRecordsInput) -> GetRecordsOutput:
1✔
123
        _shard_iterator = payload["ShardIterator"]
1✔
124
        region_name = context.region
1✔
125
        if payload["ShardIterator"] in self.shard_to_region:
1✔
126
            region_name = self.shard_to_region[_shard_iterator]
1✔
127

128
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=region_name)
1✔
129
        prefix, _, payload["ShardIterator"] = _shard_iterator.rpartition("|")
1✔
130
        try:
1✔
131
            kinesis_records = kinesis.get_records(**payload)
1✔
132
        except kinesis.exceptions.ExpiredIteratorException:
1✔
UNCOV
133
            self.shard_to_region.pop(_shard_iterator, None)
×
134
            LOG.debug("Shard iterator for underlying kinesis stream expired")
×
135
            raise ExpiredIteratorException("Shard iterator has expired")
×
136
        result = {
1✔
137
            "Records": [],
138
            "NextShardIterator": f"{prefix}|{kinesis_records.get('NextShardIterator')}",
139
        }
140
        for record in kinesis_records["Records"]:
1✔
141
            record_data = loads(record["Data"])
1✔
142
            record_data["dynamodb"]["SequenceNumber"] = record["SequenceNumber"]
1✔
143
            result["Records"].append(record_data)
1✔
144

145
        # Similar as the logic in GetShardIterator, we need to track the originating region when we get the
146
        # NextShardIterator in the results.
147
        if region_name != context.region and "NextShardIterator" in result:
1✔
148
            self.shard_to_region[result["NextShardIterator"]] = region_name
1✔
149
        return GetRecordsOutput(**result)
1✔
150

151
    def get_shard_iterator(
1✔
152
        self,
153
        context: RequestContext,
154
        stream_arn: StreamArn,
155
        shard_id: ShardId,
156
        shard_iterator_type: ShardIteratorType,
157
        sequence_number: SequenceNumber = None,
158
        **kwargs,
159
    ) -> GetShardIteratorOutput:
160
        stream_name = stream_name_from_stream_arn(stream_arn)
1✔
161
        og_region = get_original_region(context=context, stream_arn=stream_arn)
1✔
162
        stream_shard_id = kinesis_shard_id(shard_id)
1✔
163
        kinesis = get_kinesis_client(account_id=context.account_id, region_name=og_region)
1✔
164

165
        kwargs = {"StartingSequenceNumber": sequence_number} if sequence_number else {}
1✔
166
        result = kinesis.get_shard_iterator(
1✔
167
            StreamName=stream_name,
168
            ShardId=stream_shard_id,
169
            ShardIteratorType=shard_iterator_type,
170
            **kwargs,
171
        )
172
        del result["ResponseMetadata"]
1✔
173
        # TODO not quite clear what the |1| exactly denotes, because at AWS it's sometimes other numbers
174
        result["ShardIterator"] = f"{stream_arn}|1|{result['ShardIterator']}"
1✔
175

176
        # In case of a replica table, we need to keep track of the real region originating the shard iterator.
177
        # This region will be later used in GetRecords to redirect to the originating region, holding the data.
178
        if og_region != context.region:
1✔
179
            self.shard_to_region[result["ShardIterator"]] = og_region
1✔
180
        return GetShardIteratorOutput(**result)
1✔
181

182
    def list_streams(
1✔
183
        self,
184
        context: RequestContext,
185
        table_name: TableName = None,
186
        limit: PositiveIntegerObject = None,
187
        exclusive_start_stream_arn: StreamArn = None,
188
        **kwargs,
189
    ) -> ListStreamsOutput:
190
        og_region = get_original_region(context=context, table_name=table_name)
1✔
191
        store = get_dynamodbstreams_store(context.account_id, og_region)
1✔
192
        result = [
1✔
193
            select_from_typed_dict(Stream, _s.StreamDescription)
194
            for _s in store.ddb_streams.values()
195
        ]
196
        if table_name:
1✔
197
            result: list[Stream] = [res for res in result if res["TableName"] == table_name]
1✔
198
            # If this is a stream from a table replica, we need to change the region in the stream ARN, as LocalStack
199
            # keeps a stream only in the originating region.
200
            if context.region != og_region:
1✔
201
                for stream in result:
1✔
202
                    stream["StreamArn"] = change_region_in_ddb_stream_arn(
1✔
203
                        stream["StreamArn"], context.region
204
                    )
205

206
        return ListStreamsOutput(Streams=result)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc