• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.38
/localstack-core/localstack/services/dynamodb/utils.py
1
import logging
1✔
2
import re
1✔
3
from binascii import crc32
1✔
4

5
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
1✔
6
from cachetools import TTLCache
1✔
7
from moto.core.exceptions import JsonRESTError
1✔
8

9
from localstack.aws.api import RequestContext
1✔
10
from localstack.aws.api.dynamodb import (
1✔
11
    AttributeMap,
12
    BatchGetRequestMap,
13
    BatchGetResponseMap,
14
    Delete,
15
    DeleteRequest,
16
    Put,
17
    PutRequest,
18
    ResourceNotFoundException,
19
    TableName,
20
    Update,
21
)
22
from localstack.aws.api.dynamodbstreams import (
1✔
23
    ResourceNotFoundException as DynamoDBStreamsResourceNotFoundException,
24
)
25
from localstack.aws.connect import connect_to
1✔
26
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
27
from localstack.http import Response
1✔
28
from localstack.utils.aws.arns import (
1✔
29
    dynamodb_stream_arn,
30
    dynamodb_table_arn,
31
    get_partition,
32
    parse_arn,
33
)
34
from localstack.utils.json import canonical_json
1✔
35
from localstack.utils.testutil import list_all_resources
1✔
36

37
LOG = logging.getLogger(__name__)
1✔
38

39
# cache schema definitions
40
SCHEMA_CACHE = TTLCache(maxsize=50, ttl=20)
1✔
41

42
_ddb_local_arn_pattern = re.compile(
1✔
43
    r'("TableArn"|"LatestStreamArn"|"StreamArn"|"ShardIterator"|"IndexArn")\s*:\s*"arn:[a-z-]+:dynamodb:ddblocal:000000000000:([^"]+)"'
44
)
45
_ddb_local_region_pattern = re.compile(r'"awsRegion"\s*:\s*"([^"]+)"')
1✔
46
_ddb_local_exception_arn_pattern = re.compile(r'arn:[a-z-]+:dynamodb:ddblocal:000000000000:([^"]+)')
1✔
47

48

49
def get_ddb_access_key(account_id: str, region_name: str) -> str:
1✔
50
    """
51
    Get the access key to be used while communicating with DynamoDB Local.
52

53
    DDBLocal supports namespacing as an undocumented feature. It works based on the value of the `Credentials`
54
    field of the `Authorization` header. We use a concatenated value of account ID and region to achieve
55
    namespacing.
56
    """
57
    return f"{account_id}{region_name}".replace("-", "")
1✔
58

59

60
class ItemSet:
1✔
61
    """Represents a set of items and provides utils to find individual items in the set"""
62

63
    def __init__(self, items: list[dict], key_schema: list[dict]):
1✔
64
        self.items_list = items
1✔
65
        self.key_schema = key_schema
1✔
66
        self._build_dict()
1✔
67

68
    def _build_dict(self):
1✔
69
        self.items_dict = {}
1✔
70
        for item in self.items_list:
1✔
71
            self.items_dict[self._hashable_key(item)] = item
1✔
72

73
    def _hashable_key(self, item: dict):
1✔
74
        keys = SchemaExtractor.extract_keys_for_schema(item=item, key_schema=self.key_schema)
1✔
75
        return canonical_json(keys)
1✔
76

77
    def find_item(self, item: dict) -> dict | None:
1✔
78
        key = self._hashable_key(item)
1✔
79
        return self.items_dict.get(key)
1✔
80

81

82
class SchemaExtractor:
1✔
83
    @classmethod
1✔
84
    def extract_keys(
1✔
85
        cls, item: dict, table_name: str, account_id: str, region_name: str
86
    ) -> dict | None:
87
        key_schema = cls.get_key_schema(table_name, account_id, region_name)
1✔
88
        return cls.extract_keys_for_schema(item, key_schema)
1✔
89

90
    @classmethod
1✔
91
    def extract_keys_for_schema(cls, item: dict, key_schema: list[dict]):
1✔
92
        result = {}
1✔
93
        for key in key_schema:
1✔
94
            attr_name = key["AttributeName"]
1✔
95
            if attr_name not in item:
1✔
96
                raise JsonRESTError(
×
97
                    error_type="ValidationException",
98
                    message="One of the required keys was not given a value",
99
                )
100
            result[attr_name] = item[attr_name]
1✔
101
        return result
1✔
102

103
    @classmethod
1✔
104
    def get_key_schema(
1✔
105
        cls, table_name: str, account_id: str, region_name: str
106
    ) -> list[dict] | None:
107
        from localstack.services.dynamodb.provider import get_store
1✔
108

109
        table_definitions: dict = get_store(
1✔
110
            account_id=account_id,
111
            region_name=region_name,
112
        ).table_definitions
113
        table_def = table_definitions.get(table_name)
1✔
114
        if not table_def:
1✔
115
            # Try fetching from the backend in case table_definitions has been reset
116
            schema = cls.get_table_schema(
1✔
117
                table_name=table_name, account_id=account_id, region_name=region_name
118
            )
119
            if not schema:
1✔
120
                raise ResourceNotFoundException(f"Unknown table: {table_name} not found")
×
121
            # Save the schema in the cache
122
            table_definitions[table_name] = schema["Table"]
1✔
123
            table_def = table_definitions[table_name]
1✔
124
        return table_def["KeySchema"]
1✔
125

126
    @classmethod
1✔
127
    def get_table_schema(cls, table_name: str, account_id: str, region_name: str):
1✔
128
        key = dynamodb_table_arn(
1✔
129
            table_name=table_name, account_id=account_id, region_name=region_name
130
        )
131
        schema = SCHEMA_CACHE.get(key)
1✔
132
        if not schema:
1✔
133
            # TODO: consider making in-memory lookup instead of API call
134
            ddb_client = connect_to(
1✔
135
                aws_access_key_id=account_id,
136
                aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
137
                region_name=region_name,
138
            ).dynamodb
139
            try:
1✔
140
                schema = ddb_client.describe_table(TableName=table_name)
1✔
141
                SCHEMA_CACHE[key] = schema
1✔
142
            except Exception as e:
×
143
                if "ResourceNotFoundException" in str(e):
×
144
                    raise ResourceNotFoundException(f"Unknown table: {table_name}") from e
×
145
                raise
×
146
        return schema
1✔
147

148
    @classmethod
1✔
149
    def invalidate_table_schema(cls, table_name: str, account_id: str, region_name: str):
1✔
150
        """
151
        Allow cached table schemas to be invalidated without waiting for the TTL to expire
152
        """
153
        key = dynamodb_table_arn(
1✔
154
            table_name=table_name, account_id=account_id, region_name=region_name
155
        )
156
        SCHEMA_CACHE.pop(key, None)
1✔
157

158

159
class ItemFinder:
1✔
160
    @staticmethod
1✔
161
    def get_ddb_local_client(account_id: str, region_name: str, endpoint_url: str):
1✔
162
        ddb_client = connect_to(
1✔
163
            aws_access_key_id=get_ddb_access_key(account_id, region_name),
164
            region_name=region_name,
165
            endpoint_url=endpoint_url,
166
        ).dynamodb
167
        return ddb_client
1✔
168

169
    @staticmethod
1✔
170
    def find_existing_item(
1✔
171
        put_item: dict,
172
        table_name: str,
173
        account_id: str,
174
        region_name: str,
175
        endpoint_url: str,
176
    ) -> AttributeMap | None:
177
        from localstack.services.dynamodb.provider import ValidationException
1✔
178

179
        ddb_client = ItemFinder.get_ddb_local_client(account_id, region_name, endpoint_url)
1✔
180

181
        search_key = {}
1✔
182
        if "Key" in put_item:
1✔
183
            search_key = put_item["Key"]
1✔
184
        else:
185
            schema = SchemaExtractor.get_table_schema(table_name, account_id, region_name)
×
186
            schemas = [schema["Table"]["KeySchema"]]
×
187
            for index in schema["Table"].get("GlobalSecondaryIndexes", []):
×
188
                # TODO
189
                # schemas.append(index['KeySchema'])
190
                pass
×
191
            for schema in schemas:
×
192
                for key in schema:
×
193
                    key_name = key["AttributeName"]
×
194
                    key_value = put_item["Item"].get(key_name)
×
195
                    if not key_value:
×
196
                        raise ValidationException(
×
197
                            "The provided key element does not match the schema"
198
                        )
199
                    search_key[key_name] = key_value
×
200
            if not search_key:
×
201
                return
×
202

203
        try:
1✔
204
            existing_item = ddb_client.get_item(TableName=table_name, Key=search_key)
1✔
205
        except ddb_client.exceptions.ClientError as e:
×
206
            LOG.warning(
×
207
                "Unable to get item from DynamoDB table '%s': %s",
208
                table_name,
209
                e,
210
            )
211
            return
×
212

213
        return existing_item.get("Item")
1✔
214

215
    @staticmethod
1✔
216
    def find_existing_items(
1✔
217
        put_items_per_table: dict[
218
            TableName, list[PutRequest | DeleteRequest | Put | Update | Delete]
219
        ],
220
        account_id: str,
221
        region_name: str,
222
        endpoint_url: str,
223
    ) -> BatchGetResponseMap:
224
        from localstack.services.dynamodb.provider import ValidationException
1✔
225

226
        ddb_client = ItemFinder.get_ddb_local_client(account_id, region_name, endpoint_url)
1✔
227

228
        get_items_request: BatchGetRequestMap = {}
1✔
229
        for table_name, put_item_reqs in put_items_per_table.items():
1✔
230
            table_schema = None
1✔
231
            for put_item in put_item_reqs:
1✔
232
                search_key = {}
1✔
233
                if "Key" in put_item:
1✔
234
                    search_key = put_item["Key"]
1✔
235
                else:
236
                    if not table_schema:
1✔
237
                        table_schema = SchemaExtractor.get_table_schema(
1✔
238
                            table_name, account_id, region_name
239
                        )
240

241
                    schemas = [table_schema["Table"]["KeySchema"]]
1✔
242
                    for index in table_schema["Table"].get("GlobalSecondaryIndexes", []):
1✔
243
                        # TODO
244
                        # schemas.append(index['KeySchema'])
245
                        pass
1✔
246
                    for schema in schemas:
1✔
247
                        for key in schema:
1✔
248
                            key_name = key["AttributeName"]
1✔
249
                            key_value = put_item["Item"].get(key_name)
1✔
250
                            if not key_value:
1✔
251
                                raise ValidationException(
×
252
                                    "The provided key element does not match the schema"
253
                                )
254
                            search_key[key_name] = key_value
1✔
255
                    if not search_key:
1✔
256
                        continue
×
257
                table_keys = get_items_request.setdefault(table_name, {"Keys": []})
1✔
258
                table_keys["Keys"].append(search_key)
1✔
259

260
        try:
1✔
261
            existing_items = ddb_client.batch_get_item(RequestItems=get_items_request)
1✔
262
        except ddb_client.exceptions.ClientError as e:
×
263
            LOG.warning(
×
264
                "Unable to get items from DynamoDB tables '%s': %s",
265
                list(put_items_per_table.values()),
266
                e,
267
            )
268
            return {}
×
269

270
        return existing_items.get("Responses", {})
1✔
271

272
    @classmethod
1✔
273
    def list_existing_items_for_statement(
1✔
274
        cls, partiql_statement: str, account_id: str, region_name: str, endpoint_url: str
275
    ) -> list:
276
        table_name = extract_table_name_from_partiql_update(partiql_statement)
1✔
277
        if not table_name:
1✔
278
            return []
×
279
        all_items = cls.get_all_table_items(
1✔
280
            account_id=account_id,
281
            region_name=region_name,
282
            table_name=table_name,
283
            endpoint_url=endpoint_url,
284
        )
285
        return all_items
1✔
286

287
    @staticmethod
1✔
288
    def get_all_table_items(
1✔
289
        account_id: str, region_name: str, table_name: str, endpoint_url: str
290
    ) -> list:
291
        ddb_client = ItemFinder.get_ddb_local_client(account_id, region_name, endpoint_url)
1✔
292
        dynamodb_kwargs = {"TableName": table_name}
1✔
293
        all_items = list_all_resources(
1✔
294
            lambda kwargs: ddb_client.scan(**{**kwargs, **dynamodb_kwargs}),
295
            last_token_attr_name="LastEvaluatedKey",
296
            next_token_attr_name="ExclusiveStartKey",
297
            list_attr_name="Items",
298
        )
299
        return all_items
1✔
300

301

302
def extract_table_name_from_partiql_update(statement: str) -> str | None:
1✔
303
    regex = r"^\s*(UPDATE|INSERT\s+INTO|DELETE\s+FROM)\s+([^\s]+).*"
1✔
304
    match = re.match(regex, statement, flags=re.IGNORECASE | re.MULTILINE)
1✔
305
    return match and match.group(2)
1✔
306

307

308
def dynamize_value(value) -> dict:
1✔
309
    """
310
    Take a scalar Python value or dict/list and return a dict consisting of the Amazon DynamoDB type specification and
311
    the value that needs to be sent to Amazon DynamoDB.  If the type of the value is not supported, raise a TypeError
312
    """
313
    return TypeSerializer().serialize(value)
1✔
314

315

316
def de_dynamize_record(item: dict) -> dict:
1✔
317
    """
318
    Return the given item in DynamoDB format parsed as regular dict object, i.e., convert
319
    something like `{'foo': {'S': 'test'}, 'bar': {'N': 123}}` to `{'foo': 'test', 'bar': 123}`.
320
    Note: This is the reverse operation of `dynamize_value(...)` above.
321
    """
322
    deserializer = TypeDeserializer()
1✔
323
    return {k: deserializer.deserialize(v) for k, v in item.items()}
1✔
324

325

326
def modify_ddblocal_arns(chain, context: RequestContext, response: Response):
1✔
327
    """A service response handler that modifies the dynamodb backend response."""
328
    if response_content := response.get_data(as_text=True):
1✔
329
        partition = get_partition(context.region)
1✔
330

331
        def _convert_arn(matchobj):
1✔
332
            key = matchobj.group(1)
1✔
333
            table_name = matchobj.group(2)
1✔
334
            return f'{key}: "arn:{partition}:dynamodb:{context.region}:{context.account_id}:{table_name}"'
1✔
335

336
        # fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)
337
        content_replaced = _ddb_local_arn_pattern.sub(
1✔
338
            _convert_arn,
339
            response_content,
340
        )
341
        if context.service.service_name == "dynamodbstreams":
1✔
342
            content_replaced = _ddb_local_region_pattern.sub(
×
343
                f'"awsRegion": "{context.region}"', content_replaced
344
            )
345
            if context.service_exception:
×
346
                content_replaced = _ddb_local_exception_arn_pattern.sub(
×
347
                    rf"arn:{partition}:dynamodb:{context.region}:{context.account_id}:\g<1>",
348
                    content_replaced,
349
                )
350

351
        if content_replaced != response_content:
1✔
352
            response.data = content_replaced
1✔
353
            # make sure the service response is parsed again later
354
            context.service_response = None
1✔
355

356
    # update x-amz-crc32 header required by some clients
357
    response.headers["x-amz-crc32"] = crc32(response.data) & 0xFFFFFFFF
1✔
358

359

360
def change_region_in_ddb_stream_arn(arn: str, region: str) -> str:
1✔
361
    """
362
    Modify the ARN or a DynamoDB Stream by changing its region.
363
    We need this logic when dealing with global tables, as we create a stream only in the originating region, and we
364
    need to modify the ARN to mimic the stream of the replica regions.
365
    """
366
    arn_data = parse_arn(arn)
1✔
367
    if arn_data["region"] == region:
1✔
368
        return arn
×
369

370
    if arn_data["service"] != "dynamodb":
1✔
371
        raise Exception(f"{arn} is not a DynamoDB Streams ARN")
×
372

373
    # Note: a DynamoDB Streams ARN has the following pattern:
374
    #   arn:aws:dynamodb:<region>:<account>:table/<table_name>/stream/<latest_stream_label>
375
    resource_splits = arn_data["resource"].split("/")
1✔
376
    if len(resource_splits) != 4:
1✔
377
        raise DynamoDBStreamsResourceNotFoundException(
×
378
            f"The format of the '{arn}' ARN is not valid"
379
        )
380

381
    return dynamodb_stream_arn(
1✔
382
        table_name=resource_splits[1],
383
        latest_stream_label=resource_splits[-1],
384
        account_id=arn_data["account"],
385
        region_name=region,
386
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc