• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22048723723

13 Feb 2026 06:59PM UTC coverage: 87.006% (+0.1%) from 86.883%
22048723723

push

github

web-flow
CW Logs: Test suite for service internalization (#13692)

22 of 22 new or added lines in 1 file covered. (100.0%)

928 existing lines in 33 files now uncovered.

69716 of 80128 relevant lines covered (87.01%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.47
/localstack-core/localstack/services/dynamodbstreams/dynamodbstreams_api.py
1
import logging
1✔
2
import threading
1✔
3
from typing import TYPE_CHECKING
1✔
4

5
from bson.json_util import dumps
1✔
6

7
from localstack import config
1✔
8
from localstack.aws.api import RequestContext
1✔
9
from localstack.aws.api.dynamodbstreams import (
1✔
10
    StreamDescription,
11
    StreamStatus,
12
    StreamViewType,
13
    TableName,
14
)
15
from localstack.aws.connect import connect_to
1✔
16
from localstack.services.dynamodb.v2.provider import DynamoDBProvider
1✔
17
from localstack.services.dynamodbstreams.models import (
1✔
18
    DynamoDbStreamsStore,
19
    StreamWrapper,
20
    dynamodbstreams_stores,
21
)
22
from localstack.utils.aws import arns, resources
1✔
23
from localstack.utils.common import now_utc
1✔
24
from localstack.utils.threads import FuncThread
1✔
25

26
if TYPE_CHECKING:
1✔
UNCOV
27
    from mypy_boto3_kinesis import KinesisClient
×
28

29
DDB_KINESIS_STREAM_NAME_PREFIX = "__ddb_stream_"
1✔
30

31
LOG = logging.getLogger(__name__)
1✔
32

33
_SEQUENCE_MTX = threading.RLock()
1✔
34
_SEQUENCE_NUMBER_COUNTER = 1
1✔
35

36

37
def get_dynamodbstreams_store(account_id: str, region: str) -> DynamoDbStreamsStore:
1✔
38
    return dynamodbstreams_stores[account_id][region]
1✔
39

40

41
def get_and_increment_sequence_number_counter() -> int:
1✔
42
    global _SEQUENCE_NUMBER_COUNTER
43
    with _SEQUENCE_MTX:
1✔
44
        cnt = _SEQUENCE_NUMBER_COUNTER
1✔
45
        _SEQUENCE_NUMBER_COUNTER += 1
1✔
46
        return cnt
1✔
47

48

49
def get_kinesis_client(account_id: str, region_name: str) -> "KinesisClient":
1✔
50
    # specifically specify endpoint url here to ensure we always hit the local kinesis instance
51
    return connect_to(
1✔
52
        aws_access_key_id=account_id,
53
        region_name=region_name,
54
        endpoint_url=config.internal_service_url(),
55
    ).kinesis
56

57

58
def add_dynamodb_stream(
1✔
59
    account_id: str,
60
    region_name: str,
61
    table_name: str,
62
    latest_stream_label: str | None = None,
63
    view_type: StreamViewType = StreamViewType.NEW_AND_OLD_IMAGES,
64
    enabled: bool = True,
65
) -> None:
66
    if not enabled:
1✔
UNCOV
67
        return
×
68

69
    store = get_dynamodbstreams_store(account_id, region_name)
1✔
70
    # create kinesis stream as a backend
71
    stream_name = get_kinesis_stream_name(table_name)
1✔
72
    resources.create_kinesis_stream(
1✔
73
        get_kinesis_client(account_id, region_name),
74
        stream_name=stream_name,
75
    )
76
    latest_stream_label = latest_stream_label or "latest"
1✔
77
    stream_arn = arns.dynamodb_stream_arn(
1✔
78
        table_name=table_name,
79
        latest_stream_label=latest_stream_label,
80
        account_id=account_id,
81
        region_name=region_name,
82
    )
83
    stream = StreamDescription(
1✔
84
        TableName=table_name,
85
        StreamArn=stream_arn,
86
        StreamLabel=latest_stream_label,
87
        StreamStatus=StreamStatus.ENABLING,
88
        KeySchema=[],
89
        Shards=[],
90
        StreamViewType=view_type,
91
    )
92
    store.ddb_streams[table_name] = StreamWrapper(StreamDescription=stream)
1✔
93

94

95
def get_stream_for_table(
1✔
96
    account_id: str, region_name: str, table_arn: str
97
) -> StreamDescription | None:
98
    store = get_dynamodbstreams_store(account_id, region_name)
1✔
99
    table_name = table_name_from_stream_arn(table_arn)
1✔
100
    if stream := store.ddb_streams.get(table_name):
1✔
101
        return stream.StreamDescription
1✔
102
    return None
1✔
103

104

105
def _process_forwarded_records(
1✔
106
    account_id: str, region_name: str, table_name: TableName, table_records: dict, kinesis
107
) -> None:
108
    records = table_records["records"]
1✔
109
    stream_type = table_records["table_stream_type"]
1✔
110
    # if the table does not have a DynamoDB Streams enabled, skip publishing anything
111
    if not stream_type.stream_view_type:
1✔
112
        return
1✔
113

114
    # in this case, Kinesis forces the record to have both OldImage and NewImage, so we need to filter it
115
    # as the settings are different for DDB Streams and Kinesis
116
    if stream_type.is_kinesis and stream_type.stream_view_type != StreamViewType.NEW_AND_OLD_IMAGES:
1✔
117
        kinesis_records = []
×
118

119
        # StreamViewType determines what information is written to the stream for the table
120
        # When an item in the table is inserted, updated or deleted
121
        image_filter = set()
×
122
        if stream_type.stream_view_type == StreamViewType.KEYS_ONLY:
×
UNCOV
123
            image_filter = {"OldImage", "NewImage"}
×
UNCOV
124
        elif stream_type.stream_view_type == StreamViewType.OLD_IMAGE:
×
UNCOV
125
            image_filter = {"NewImage"}
×
126
        elif stream_type.stream_view_type == StreamViewType.NEW_IMAGE:
×
UNCOV
127
            image_filter = {"OldImage"}
×
128

UNCOV
129
        for record in records:
×
UNCOV
130
            record["dynamodb"] = {
×
131
                k: v for k, v in record["dynamodb"].items() if k not in image_filter
132
            }
133

UNCOV
134
            if "SequenceNumber" not in record["dynamodb"]:
×
UNCOV
135
                record["dynamodb"]["SequenceNumber"] = str(
×
136
                    get_and_increment_sequence_number_counter()
137
                )
138

UNCOV
139
            kinesis_records.append({"Data": dumps(record), "PartitionKey": "TODO"})
×
140

141
    else:
142
        kinesis_records = []
1✔
143
        for record in records:
1✔
144
            if "SequenceNumber" not in record["dynamodb"]:
1✔
145
                # we can mutate the record for SequenceNumber, the Kinesis forwarding takes care of filtering it
146
                record["dynamodb"]["SequenceNumber"] = str(
1✔
147
                    get_and_increment_sequence_number_counter()
148
                )
149

150
            # simply pass along the records, they already have the right format
151
            kinesis_records.append({"Data": dumps(record), "PartitionKey": "TODO"})
1✔
152

153
    stream_name = get_kinesis_stream_name(table_name)
1✔
154
    kinesis.put_records(
1✔
155
        StreamName=stream_name,
156
        Records=kinesis_records,
157
    )
158

159

160
def forward_events(account_id: str, region_name: str, records_map: dict[TableName, dict]) -> None:
1✔
161
    kinesis = get_kinesis_client(account_id, region_name)
1✔
162

163
    for table_name, table_records in records_map.items():
1✔
164
        _process_forwarded_records(account_id, region_name, table_name, table_records, kinesis)
1✔
165

166

167
def delete_streams(account_id: str, region_name: str, table_arn: str) -> None:
1✔
168
    store = get_dynamodbstreams_store(account_id, region_name)
1✔
169
    table_name = table_name_from_table_arn(table_arn)
1✔
170
    if store.ddb_streams.pop(table_name, None):
1✔
171
        stream_name = get_kinesis_stream_name(table_name)
1✔
172
        # stream_arn = stream["StreamArn"]
173

174
        # we're basically asynchronously trying to delete the stream, or should we do this "synchronous" with the table
175
        # deletion?
176
        def _delete_stream(*args, **kwargs):
1✔
177
            try:
1✔
178
                kinesis_client = get_kinesis_client(account_id, region_name)
1✔
179
                # needs to be active otherwise we can't delete it
180
                kinesis_client.get_waiter("stream_exists").wait(StreamName=stream_name)
1✔
181
                kinesis_client.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
182
                kinesis_client.get_waiter("stream_not_exists").wait(StreamName=stream_name)
1✔
UNCOV
183
            except Exception:
×
UNCOV
184
                LOG.warning(
×
185
                    "Failed to delete underlying kinesis stream for dynamodb table table_arn=%s",
186
                    table_arn,
187
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
188
                )
189

190
        FuncThread(_delete_stream).start()  # fire & forget
1✔
191

192

193
def get_kinesis_stream_name(table_name: str) -> str:
1✔
194
    return DDB_KINESIS_STREAM_NAME_PREFIX + table_name
1✔
195

196

197
def table_name_from_stream_arn(stream_arn: str) -> str:
1✔
198
    return stream_arn.split(":table/", 1)[-1].split("/")[0]
1✔
199

200

201
def table_name_from_table_arn(table_arn: str) -> str:
1✔
202
    return table_name_from_stream_arn(table_arn)
1✔
203

204

205
def stream_name_from_stream_arn(stream_arn: str) -> str:
1✔
206
    table_name = table_name_from_stream_arn(stream_arn)
1✔
207
    return get_kinesis_stream_name(table_name)
1✔
208

209

210
def shard_id(kinesis_shard_id: str) -> str:
1✔
211
    timestamp = str(int(now_utc()))
1✔
212
    timestamp = f"{timestamp[:-5]}00000000".rjust(20, "0")
1✔
213
    kinesis_shard_params = kinesis_shard_id.split("-")
1✔
214
    return f"{kinesis_shard_params[0]}-{timestamp}-{kinesis_shard_params[-1][:32]}"
1✔
215

216

217
def kinesis_shard_id(dynamodbstream_shard_id: str) -> str:
1✔
218
    shard_params = dynamodbstream_shard_id.rsplit("-")
1✔
219
    return f"{shard_params[0]}-{shard_params[-1]}"
1✔
220

221

222
def get_shard_id(stream: StreamWrapper, kinesis_shard_id: str) -> str:
1✔
223
    ddb_stream_shard_id = stream.shards_id_map.get(kinesis_shard_id)
1✔
224
    if not ddb_stream_shard_id:
1✔
225
        ddb_stream_shard_id = shard_id(kinesis_shard_id)
1✔
226
        stream.shards_id_map[kinesis_shard_id] = ddb_stream_shard_id
1✔
227

228
    return ddb_stream_shard_id
1✔
229

230

231
def get_original_region(
1✔
232
    context: RequestContext, stream_arn: str | None = None, table_name: str | None = None
233
) -> str:
234
    """
235
    In DDB Global tables, we forward all the requests to the original region, instead of really replicating the data.
236
    Since each table has a separate stream associated, we need to have a similar forwarding logic for DDB Streams.
237
    To determine the original region, we need the table name, that can be either provided here or determined from the
238
    ARN of the stream.
239
    """
240
    if not stream_arn and not table_name:
1✔
241
        LOG.debug(
1✔
242
            "No Stream ARN or table name provided. Returning region '%s' from the request",
243
            context.region,
244
        )
245
        return context.region
1✔
246

247
    table_name = table_name or table_name_from_stream_arn(stream_arn)
1✔
248
    return DynamoDBProvider.get_global_table_region(context=context, table_name=table_name)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc