• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19844934392

01 Dec 2025 07:55PM UTC coverage: 86.945% (+0.1%) from 86.821%
19844934392

push

github

web-flow
Update ASF APIs, provider signatures, disable lambda patches (#13444)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>
Co-authored-by: Silvio Vasiljevic <silvio.vasiljevic@gmail.com>

69707 of 80174 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.88
/localstack-core/localstack/services/dynamodb/provider.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import random
1✔
6
import re
1✔
7
import threading
1✔
8
import time
1✔
9
import traceback
1✔
10
from collections import defaultdict
1✔
11
from concurrent.futures import ThreadPoolExecutor
1✔
12
from contextlib import contextmanager
1✔
13
from datetime import datetime
1✔
14
from operator import itemgetter
1✔
15

16
import requests
1✔
17
import werkzeug
1✔
18

19
from localstack import config
1✔
20
from localstack.aws import handlers
1✔
21
from localstack.aws.api import (
1✔
22
    CommonServiceException,
23
    RequestContext,
24
    ServiceRequest,
25
    ServiceResponse,
26
    handler,
27
)
28
from localstack.aws.api.dynamodb import (
1✔
29
    ApproximateCreationDateTimePrecision,
30
    AttributeMap,
31
    BatchExecuteStatementOutput,
32
    BatchGetItemOutput,
33
    BatchGetRequestMap,
34
    BatchGetResponseMap,
35
    BatchWriteItemInput,
36
    BatchWriteItemOutput,
37
    BatchWriteItemRequestMap,
38
    BillingMode,
39
    ContinuousBackupsDescription,
40
    ContinuousBackupsStatus,
41
    CreateGlobalTableOutput,
42
    CreateTableInput,
43
    CreateTableOutput,
44
    Delete,
45
    DeleteItemInput,
46
    DeleteItemOutput,
47
    DeleteRequest,
48
    DeleteTableOutput,
49
    DescribeContinuousBackupsOutput,
50
    DescribeContributorInsightsInput,
51
    DescribeContributorInsightsOutput,
52
    DescribeGlobalTableOutput,
53
    DescribeKinesisStreamingDestinationOutput,
54
    DescribeTableOutput,
55
    DescribeTimeToLiveOutput,
56
    DestinationStatus,
57
    DynamodbApi,
58
    EnableKinesisStreamingConfiguration,
59
    ExecuteStatementInput,
60
    ExecuteStatementOutput,
61
    ExecuteTransactionInput,
62
    ExecuteTransactionOutput,
63
    GetItemInput,
64
    GetItemOutput,
65
    GlobalTableAlreadyExistsException,
66
    GlobalTableNotFoundException,
67
    KinesisStreamingDestinationOutput,
68
    ListGlobalTablesOutput,
69
    ListTablesInputLimit,
70
    ListTablesOutput,
71
    ListTagsOfResourceOutput,
72
    NextTokenString,
73
    PartiQLBatchRequest,
74
    PointInTimeRecoveryDescription,
75
    PointInTimeRecoverySpecification,
76
    PointInTimeRecoveryStatus,
77
    PositiveIntegerObject,
78
    ProvisionedThroughputExceededException,
79
    Put,
80
    PutItemInput,
81
    PutItemOutput,
82
    PutRequest,
83
    QueryInput,
84
    QueryOutput,
85
    RegionName,
86
    ReplicaDescription,
87
    ReplicaList,
88
    ReplicaStatus,
89
    ReplicaUpdateList,
90
    ResourceArnString,
91
    ResourceInUseException,
92
    ResourceNotFoundException,
93
    ReturnConsumedCapacity,
94
    ScanInput,
95
    ScanOutput,
96
    StreamArn,
97
    TableArn,
98
    TableDescription,
99
    TableName,
100
    TagKeyList,
101
    TagList,
102
    TimeToLiveSpecification,
103
    TransactGetItemList,
104
    TransactGetItemsOutput,
105
    TransactWriteItem,
106
    TransactWriteItemList,
107
    TransactWriteItemsInput,
108
    TransactWriteItemsOutput,
109
    Update,
110
    UpdateContinuousBackupsOutput,
111
    UpdateGlobalTableOutput,
112
    UpdateItemInput,
113
    UpdateItemOutput,
114
    UpdateKinesisStreamingConfiguration,
115
    UpdateKinesisStreamingDestinationOutput,
116
    UpdateTableInput,
117
    UpdateTableOutput,
118
    UpdateTimeToLiveOutput,
119
    WriteRequest,
120
)
121
from localstack.aws.api.dynamodbstreams import StreamStatus
1✔
122
from localstack.aws.connect import connect_to
1✔
123
from localstack.config import is_persistence_enabled
1✔
124
from localstack.constants import (
1✔
125
    AUTH_CREDENTIAL_REGEX,
126
    AWS_REGION_US_EAST_1,
127
    INTERNAL_AWS_SECRET_ACCESS_KEY,
128
)
129
from localstack.http import Request, Response, route
1✔
130
from localstack.services.dynamodb.models import (
1✔
131
    DynamoDBStore,
132
    RecordsMap,
133
    StreamRecord,
134
    StreamRecords,
135
    TableRecords,
136
    TableStreamType,
137
    dynamodb_stores,
138
)
139
from localstack.services.dynamodb.server import DynamodbServer
1✔
140
from localstack.services.dynamodb.utils import (
1✔
141
    ItemFinder,
142
    ItemSet,
143
    SchemaExtractor,
144
    de_dynamize_record,
145
    extract_table_name_from_partiql_update,
146
    get_ddb_access_key,
147
    modify_ddblocal_arns,
148
)
149
from localstack.services.dynamodbstreams import dynamodbstreams_api
1✔
150
from localstack.services.dynamodbstreams.models import dynamodbstreams_stores
1✔
151
from localstack.services.edge import ROUTER
1✔
152
from localstack.services.plugins import ServiceLifecycleHook
1✔
153
from localstack.state import AssetDirectory, StateVisitor
1✔
154
from localstack.utils.aws import arns
1✔
155
from localstack.utils.aws.arns import (
1✔
156
    extract_account_id_from_arn,
157
    extract_region_from_arn,
158
    get_partition,
159
)
160
from localstack.utils.aws.aws_stack import get_valid_regions_for_service
1✔
161
from localstack.utils.aws.request_context import (
1✔
162
    extract_account_id_from_headers,
163
    extract_region_from_headers,
164
)
165
from localstack.utils.collections import select_attributes, select_from_typed_dict
1✔
166
from localstack.utils.common import short_uid, to_bytes
1✔
167
from localstack.utils.files import cp_r, rm_rf
1✔
168
from localstack.utils.json import BytesEncoder, canonical_json
1✔
169
from localstack.utils.scheduler import Scheduler
1✔
170
from localstack.utils.strings import long_uid, md5, to_str
1✔
171
from localstack.utils.threads import FuncThread, start_thread
1✔
172

173
# set up logger
174
LOG = logging.getLogger(__name__)
1✔
175

176
# action header prefix
177
ACTION_PREFIX = "DynamoDB_20120810."
1✔
178

179
# list of actions subject to throughput limitations
180
READ_THROTTLED_ACTIONS = [
1✔
181
    "GetItem",
182
    "Query",
183
    "Scan",
184
    "TransactGetItems",
185
    "BatchGetItem",
186
]
187
WRITE_THROTTLED_ACTIONS = [
1✔
188
    "PutItem",
189
    "BatchWriteItem",
190
    "UpdateItem",
191
    "DeleteItem",
192
    "TransactWriteItems",
193
]
194
THROTTLED_ACTIONS = READ_THROTTLED_ACTIONS + WRITE_THROTTLED_ACTIONS
1✔
195

196
MANAGED_KMS_KEYS = {}
1✔
197

198

199
def dynamodb_table_exists(table_name: str, client=None) -> bool:
1✔
200
    client = client or connect_to().dynamodb
1✔
201
    paginator = client.get_paginator("list_tables")
1✔
202
    pages = paginator.paginate(PaginationConfig={"PageSize": 100})
1✔
203
    table_name = to_str(table_name)
1✔
204
    return any(table_name in page["TableNames"] for page in pages)
1✔
205

206

207
class EventForwarder:
1✔
208
    def __init__(self, num_thread: int = 10):
1✔
209
        self.executor = ThreadPoolExecutor(num_thread, thread_name_prefix="ddb_stream_fwd")
1✔
210

211
    def shutdown(self):
1✔
212
        self.executor.shutdown(wait=False)
1✔
213

214
    def forward_to_targets(
1✔
215
        self, account_id: str, region_name: str, records_map: RecordsMap, background: bool = True
216
    ) -> None:
217
        if background:
1✔
218
            self._submit_records(
1✔
219
                account_id=account_id,
220
                region_name=region_name,
221
                records_map=records_map,
222
            )
223
        else:
224
            self._forward(account_id, region_name, records_map)
×
225

226
    def _submit_records(self, account_id: str, region_name: str, records_map: RecordsMap):
1✔
227
        """Required for patching submit with local thread context for EventStudio"""
228
        self.executor.submit(
1✔
229
            self._forward,
230
            account_id,
231
            region_name,
232
            records_map,
233
        )
234

235
    def _forward(self, account_id: str, region_name: str, records_map: RecordsMap) -> None:
1✔
236
        try:
1✔
237
            self.forward_to_kinesis_stream(account_id, region_name, records_map)
1✔
238
        except Exception as e:
×
239
            LOG.debug(
×
240
                "Error while publishing to Kinesis streams: '%s'",
241
                e,
242
                exc_info=LOG.isEnabledFor(logging.DEBUG),
243
            )
244

245
        try:
1✔
246
            self.forward_to_ddb_stream(account_id, region_name, records_map)
1✔
247
        except Exception as e:
×
248
            LOG.debug(
×
249
                "Error while publishing to DynamoDB streams, '%s'",
250
                e,
251
                exc_info=LOG.isEnabledFor(logging.DEBUG),
252
            )
253

254
    @staticmethod
1✔
255
    def forward_to_ddb_stream(account_id: str, region_name: str, records_map: RecordsMap) -> None:
1✔
256
        dynamodbstreams_api.forward_events(account_id, region_name, records_map)
1✔
257

258
    @staticmethod
1✔
259
    def forward_to_kinesis_stream(
1✔
260
        account_id: str, region_name: str, records_map: RecordsMap
261
    ) -> None:
262
        # You can only stream data from DynamoDB to Kinesis Data Streams in the same AWS account and AWS Region as your
263
        # table.
264
        # You can only stream data from a DynamoDB table to one Kinesis data stream.
265
        store = get_store(account_id, region_name)
1✔
266

267
        for table_name, table_records in records_map.items():
1✔
268
            table_stream_type = table_records["table_stream_type"]
1✔
269
            if not table_stream_type.is_kinesis:
1✔
270
                continue
1✔
271

272
            kinesis_records = []
1✔
273

274
            table_arn = arns.dynamodb_table_arn(table_name, account_id, region_name)
1✔
275
            records = table_records["records"]
1✔
276
            table_def = store.table_definitions.get(table_name) or {}
1✔
277
            stream_arn = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"]
1✔
278
            for record in records:
1✔
279
                kinesis_record = dict(
1✔
280
                    tableName=table_name,
281
                    recordFormat="application/json",
282
                    userIdentity=None,
283
                    **record,
284
                )
285
                fields_to_remove = {"StreamViewType", "SequenceNumber"}
1✔
286
                kinesis_record["dynamodb"] = {
1✔
287
                    k: v for k, v in record["dynamodb"].items() if k not in fields_to_remove
288
                }
289
                kinesis_record.pop("eventVersion", None)
1✔
290

291
                hash_keys = list(
1✔
292
                    filter(lambda key: key["KeyType"] == "HASH", table_def["KeySchema"])
293
                )
294
                # TODO: reverse properly how AWS creates the partition key, it seems to be an MD5 hash
295
                kinesis_partition_key = md5(f"{table_name}{hash_keys[0]['AttributeName']}")
1✔
296

297
                kinesis_records.append(
1✔
298
                    {
299
                        "Data": json.dumps(kinesis_record, cls=BytesEncoder),
300
                        "PartitionKey": kinesis_partition_key,
301
                    }
302
                )
303

304
            kinesis = connect_to(
1✔
305
                aws_access_key_id=account_id,
306
                aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
307
                region_name=region_name,
308
            ).kinesis.request_metadata(service_principal="dynamodb", source_arn=table_arn)
309

310
            kinesis.put_records(
1✔
311
                StreamARN=stream_arn,
312
                Records=kinesis_records,
313
            )
314

315
    @classmethod
1✔
316
    def is_kinesis_stream_exists(cls, stream_arn):
1✔
317
        account_id = extract_account_id_from_arn(stream_arn)
1✔
318
        region_name = extract_region_from_arn(stream_arn)
1✔
319

320
        kinesis = connect_to(
1✔
321
            aws_access_key_id=account_id,
322
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
323
            region_name=region_name,
324
        ).kinesis
325
        stream_name_from_arn = stream_arn.split("/", 1)[1]
1✔
326
        # check if the stream exists in kinesis for the user
327
        filtered = list(
1✔
328
            filter(
329
                lambda stream_name: stream_name == stream_name_from_arn,
330
                kinesis.list_streams()["StreamNames"],
331
            )
332
        )
333
        return bool(filtered)
1✔
334

335

336
class SSEUtils:
1✔
337
    """Utils for server-side encryption (SSE)"""
338

339
    @classmethod
1✔
340
    def get_sse_kms_managed_key(cls, account_id: str, region_name: str):
1✔
341
        from localstack.services.kms import provider
1✔
342

343
        existing_key = MANAGED_KMS_KEYS.get(region_name)
1✔
344
        if existing_key:
1✔
345
            return existing_key
1✔
346
        kms_client = connect_to(
1✔
347
            aws_access_key_id=account_id,
348
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
349
            region_name=region_name,
350
        ).kms
351
        key_data = kms_client.create_key(
1✔
352
            Description="Default key that protects my DynamoDB data when no other key is defined"
353
        )
354
        key_id = key_data["KeyMetadata"]["KeyId"]
1✔
355

356
        provider.set_key_managed(key_id, account_id, region_name)
1✔
357
        MANAGED_KMS_KEYS[region_name] = key_id
1✔
358
        return key_id
1✔
359

360
    @classmethod
1✔
361
    def get_sse_description(cls, account_id: str, region_name: str, data):
1✔
362
        if data.get("Enabled"):
1✔
363
            kms_master_key_id = data.get("KMSMasterKeyId")
1✔
364
            if not kms_master_key_id:
1✔
365
                # this is of course not the actual key for dynamodb, just a better, since existing, mock
366
                kms_master_key_id = cls.get_sse_kms_managed_key(account_id, region_name)
1✔
367
            kms_master_key_id = arns.kms_key_arn(kms_master_key_id, account_id, region_name)
1✔
368
            return {
1✔
369
                "Status": "ENABLED",
370
                "SSEType": "KMS",  # no other value is allowed here
371
                "KMSMasterKeyArn": kms_master_key_id,
372
            }
373
        return {}
×
374

375

376
class ValidationException(CommonServiceException):
1✔
377
    def __init__(self, message: str):
1✔
378
        super().__init__(code="ValidationException", status_code=400, message=message)
1✔
379

380

381
def get_store(account_id: str, region_name: str) -> DynamoDBStore:
1✔
382
    # special case: AWS NoSQL Workbench sends "localhost" as region - replace with proper region here
383
    region_name = DynamoDBProvider.ddb_region_name(region_name)
1✔
384
    return dynamodb_stores[account_id][region_name]
1✔
385

386

387
@contextmanager
1✔
388
def modify_context_region(context: RequestContext, region: str):
1✔
389
    """
390
    Context manager that modifies the region of a `RequestContext`. At the exit, the context is restored to its
391
    original state.
392

393
    :param context: the context to modify
394
    :param region: the modified region
395
    :return: a modified `RequestContext`
396
    """
397
    original_region = context.region
1✔
398
    original_authorization = context.request.headers.get("Authorization")
1✔
399

400
    key = get_ddb_access_key(context.account_id, region)
1✔
401

402
    context.region = region
1✔
403
    context.request.headers["Authorization"] = re.sub(
1✔
404
        AUTH_CREDENTIAL_REGEX,
405
        rf"Credential={key}/\2/{region}/\4/",
406
        original_authorization or "",
407
        flags=re.IGNORECASE,
408
    )
409

410
    try:
1✔
411
        yield context
1✔
412
    except Exception:
1✔
413
        raise
1✔
414
    finally:
415
        # revert the original context
416
        context.region = original_region
1✔
417
        context.request.headers["Authorization"] = original_authorization
1✔
418

419

420
class DynamoDBDeveloperEndpoints:
1✔
421
    """
422
    Developer endpoints for DynamoDB
423
    DELETE /_aws/dynamodb/expired - delete expired items from tables with TTL enabled; return the number of expired
424
        items deleted
425
    """
426

427
    @route("/_aws/dynamodb/expired", methods=["DELETE"])
1✔
428
    def delete_expired_messages(self, _: Request):
1✔
429
        no_expired_items = delete_expired_items()
1✔
430
        return {"ExpiredItems": no_expired_items}
1✔
431

432

433
def delete_expired_items() -> int:
1✔
434
    """
435
    This utility function iterates over all stores, looks for tables with TTL enabled,
436
    scan such tables and delete expired items.
437
    """
438
    no_expired_items = 0
1✔
439
    for account_id, region_name, state in dynamodb_stores.iter_stores():
1✔
440
        ttl_specs = state.ttl_specifications
1✔
441
        client = connect_to(aws_access_key_id=account_id, region_name=region_name).dynamodb
1✔
442
        for table_name, ttl_spec in ttl_specs.items():
1✔
443
            if ttl_spec.get("Enabled", False):
1✔
444
                attribute_name = ttl_spec.get("AttributeName")
1✔
445
                current_time = int(datetime.now().timestamp())
1✔
446
                try:
1✔
447
                    result = client.scan(
1✔
448
                        TableName=table_name,
449
                        FilterExpression="#ttl <= :threshold",
450
                        ExpressionAttributeValues={":threshold": {"N": str(current_time)}},
451
                        ExpressionAttributeNames={"#ttl": attribute_name},
452
                    )
453
                    items_to_delete = result.get("Items", [])
1✔
454
                    no_expired_items += len(items_to_delete)
1✔
455
                    table_description = client.describe_table(TableName=table_name)
1✔
456
                    partition_key, range_key = _get_hash_and_range_key(table_description)
1✔
457
                    keys_to_delete = [
1✔
458
                        {partition_key: item.get(partition_key)}
459
                        if range_key is None
460
                        else {
461
                            partition_key: item.get(partition_key),
462
                            range_key: item.get(range_key),
463
                        }
464
                        for item in items_to_delete
465
                    ]
466
                    delete_requests = [{"DeleteRequest": {"Key": key}} for key in keys_to_delete]
1✔
467
                    for i in range(0, len(delete_requests), 25):
1✔
468
                        batch = delete_requests[i : i + 25]
1✔
469
                        client.batch_write_item(RequestItems={table_name: batch})
1✔
470
                except Exception as e:
×
471
                    LOG.warning(
×
472
                        "An error occurred when deleting expired items from table %s: %s",
473
                        table_name,
474
                        e,
475
                    )
476
    return no_expired_items
1✔
477

478

479
def _get_hash_and_range_key(table_description: DescribeTableOutput) -> [str, str | None]:
1✔
480
    key_schema = table_description.get("Table", {}).get("KeySchema", [])
1✔
481
    hash_key, range_key = None, None
1✔
482
    for key in key_schema:
1✔
483
        if key["KeyType"] == "HASH":
1✔
484
            hash_key = key["AttributeName"]
1✔
485
        if key["KeyType"] == "RANGE":
1✔
486
            range_key = key["AttributeName"]
1✔
487
    return hash_key, range_key
1✔
488

489

490
class ExpiredItemsWorker:
1✔
491
    """A worker that periodically computes and deletes expired items from DynamoDB tables"""
492

493
    def __init__(self) -> None:
1✔
494
        super().__init__()
1✔
495
        self.scheduler = Scheduler()
1✔
496
        self.thread: FuncThread | None = None
1✔
497
        self.mutex = threading.RLock()
1✔
498

499
    def start(self):
1✔
500
        with self.mutex:
×
501
            if self.thread:
×
502
                return
×
503

504
            self.scheduler = Scheduler()
×
505
            self.scheduler.schedule(
×
506
                delete_expired_items, period=60 * 60
507
            )  # the background process seems slow on AWS
508

509
            def _run(*_args):
×
510
                self.scheduler.run()
×
511

512
            self.thread = start_thread(_run, name="ddb-remove-expired-items")
×
513

514
    def stop(self):
1✔
515
        with self.mutex:
1✔
516
            if self.scheduler:
1✔
517
                self.scheduler.close()
1✔
518

519
            if self.thread:
1✔
520
                self.thread.stop()
×
521

522
            self.thread = None
1✔
523
            self.scheduler = None
1✔
524

525

526
class DynamoDBProvider(DynamodbApi, ServiceLifecycleHook):
1✔
527
    server: DynamodbServer
1✔
528
    """The instance of the server managing the instance of DynamoDB local"""
1✔
529
    asset_directory = f"{config.dirs.data}/dynamodb"
1✔
530
    """The directory that contains the .db files saved by DynamoDB Local"""
1✔
531
    tmp_asset_directory = f"{config.dirs.tmp}/dynamodb"
1✔
532
    """Temporary directory for the .db files saved by DynamoDB Local when MANUAL snapshot persistence is enabled"""
1✔
533

534
    def __init__(self):
1✔
535
        self.server = self._new_dynamodb_server()
1✔
536
        self._expired_items_worker = ExpiredItemsWorker()
1✔
537
        self._router_rules = []
1✔
538
        # TODO: fix _event_forwarder to have lazy instantiation of the ThreadPoolExecutor
539
        self._event_forwarder = EventForwarder()
1✔
540

541
    def on_before_start(self):
1✔
542
        # We must copy back whatever state is saved to the temporary location to avoid to start always from a blank
543
        # state. See the `on_before_state_save` hook.
544
        if is_persistence_enabled() and config.SNAPSHOT_SAVE_STRATEGY == "MANUAL":
1✔
545
            if os.path.exists(self.asset_directory):
×
546
                LOG.debug("Copying %s to %s", self.tmp_asset_directory, self.asset_directory)
×
547
                cp_r(self.asset_directory, self.tmp_asset_directory, rm_dest_on_conflict=True)
×
548

549
        self.server.start_dynamodb()
1✔
550
        if config.DYNAMODB_REMOVE_EXPIRED_ITEMS:
1✔
551
            self._expired_items_worker.start()
×
552
        self._router_rules = ROUTER.add(DynamoDBDeveloperEndpoints())
1✔
553

554
    def on_before_stop(self):
1✔
555
        self._expired_items_worker.stop()
1✔
556
        ROUTER.remove(self._router_rules)
1✔
557
        self._event_forwarder.shutdown()
1✔
558

559
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
560
        visitor.visit(dynamodb_stores)
×
561
        visitor.visit(dynamodbstreams_stores)
×
562
        visitor.visit(AssetDirectory(self.service, os.path.join(config.dirs.data, self.service)))
×
563

564
    def on_before_state_reset(self):
1✔
565
        self.server.stop_dynamodb()
×
566
        rm_rf(self.tmp_asset_directory)
×
567

568
    def on_before_state_load(self):
1✔
569
        self.server.stop_dynamodb()
×
570

571
    def on_after_state_reset(self):
1✔
572
        self.server.start_dynamodb()
×
573

574
    @staticmethod
1✔
575
    def _new_dynamodb_server() -> DynamodbServer:
1✔
576
        return DynamodbServer.get()
1✔
577

578
    def on_after_state_load(self):
1✔
579
        self.server.start_dynamodb()
×
580

581
    def on_before_state_save(self) -> None:
1✔
582
        # When the save strategy is MANUAL, we do not save the DB path to the usual ``confid.dirs.data`` folder.
583
        # With the MANUAL strategy, we want to take a snapshot on-demand but this is not possible if the DB files
584
        # are already in ``config.dirs.data``. For instance, the set of operation below will result in both tables
585
        # being implicitly saved.
586
        # - awslocal dynamodb create-table table1
587
        # - curl -X POST http://localhost:4566/_localstack/state/save
588
        # - awslocal dynamodb create-table table2
589
        # To avoid this problem, we start the DDBLocal server in a temporary directory that is then copied over
590
        # ``config.dirs.data`` when the save needs to be saved.
591
        # The ideal solution to the problem would be to always start the server in memory and have a dump capability.
592
        if is_persistence_enabled() and config.SNAPSHOT_SAVE_STRATEGY == "MANUAL":
×
593
            LOG.debug("Copying %s to %s", self.tmp_asset_directory, self.asset_directory)
×
594
            cp_r(self.tmp_asset_directory, self.asset_directory, rm_dest_on_conflict=True)
×
595

596
    def on_after_init(self):
1✔
597
        # add response processor specific to ddblocal
598
        handlers.modify_service_response.append(self.service, modify_ddblocal_arns)
1✔
599

600
        # routes for the shell ui
601
        ROUTER.add(
1✔
602
            path="/shell",
603
            endpoint=self.handle_shell_ui_redirect,
604
            methods=["GET"],
605
        )
606
        ROUTER.add(
1✔
607
            path="/shell/<regex('.*'):req_path>",
608
            endpoint=self.handle_shell_ui_request,
609
        )
610

611
    def _forward_request(
1✔
612
        self,
613
        context: RequestContext,
614
        region: str | None,
615
        service_request: ServiceRequest | None = None,
616
    ) -> ServiceResponse:
617
        """
618
        Modify the context region and then forward request to DynamoDB Local.
619

620
        This is used for operations impacted by global tables. In LocalStack, a single copy of global table
621
        is kept, and any requests to replicated tables are forwarded to this original table.
622
        """
623
        if region:
1✔
624
            with modify_context_region(context, region):
1✔
625
                return self.forward_request(context, service_request=service_request)
1✔
626
        return self.forward_request(context, service_request=service_request)
×
627

628
    def forward_request(
1✔
629
        self, context: RequestContext, service_request: ServiceRequest = None
630
    ) -> ServiceResponse:
631
        """
632
        Forward a request to DynamoDB Local.
633
        """
634
        self.check_provisioned_throughput(context.operation.name)
1✔
635
        self.prepare_request_headers(
1✔
636
            context.request.headers, account_id=context.account_id, region_name=context.region
637
        )
638
        return self.server.proxy(context, service_request)
1✔
639

640
    def get_forward_url(self, account_id: str, region_name: str) -> str:
1✔
641
        """Return the URL of the backend DynamoDBLocal server to forward requests to"""
642
        return self.server.url
×
643

644
    def handle_shell_ui_redirect(self, request: werkzeug.Request) -> Response:
1✔
645
        headers = {"Refresh": f"0; url={config.external_service_url()}/shell/index.html"}
×
646
        return Response("", headers=headers)
×
647

648
    def handle_shell_ui_request(self, request: werkzeug.Request, req_path: str) -> Response:
1✔
649
        # TODO: "DynamoDB Local Web Shell was deprecated with version 1.16.X and is not available any
650
        #  longer from 1.17.X to latest. There are no immediate plans for a new Web Shell to be introduced."
651
        #  -> keeping this for now, to allow configuring custom installs; should consider removing it in the future
652
        # https://repost.aws/questions/QUHyIzoEDqQ3iOKlUEp1LPWQ#ANdBm9Nz9TRf6VqR3jZtcA1g
653
        req_path = f"/{req_path}" if not req_path.startswith("/") else req_path
×
654
        account_id = extract_account_id_from_headers(request.headers)
×
655
        region_name = extract_region_from_headers(request.headers)
×
656
        url = f"{self.get_forward_url(account_id, region_name)}/shell{req_path}"
×
657
        result = requests.request(
×
658
            method=request.method, url=url, headers=request.headers, data=request.data
659
        )
660
        return Response(result.content, headers=dict(result.headers), status=result.status_code)
×
661

662
    #
663
    # Table ops
664
    #
665

666
    @handler("CreateTable", expand=False)
1✔
667
    def create_table(
1✔
668
        self,
669
        context: RequestContext,
670
        create_table_input: CreateTableInput,
671
    ) -> CreateTableOutput:
672
        table_name = create_table_input["TableName"]
1✔
673

674
        # Return this specific error message to keep parity with AWS
675
        if self.table_exists(context.account_id, context.region, table_name):
1✔
676
            raise ResourceInUseException(f"Table already exists: {table_name}")
1✔
677

678
        billing_mode = create_table_input.get("BillingMode")
1✔
679
        provisioned_throughput = create_table_input.get("ProvisionedThroughput")
1✔
680
        if billing_mode == BillingMode.PAY_PER_REQUEST and provisioned_throughput is not None:
1✔
681
            raise ValidationException(
1✔
682
                "One or more parameter values were invalid: Neither ReadCapacityUnits nor WriteCapacityUnits can be "
683
                "specified when BillingMode is PAY_PER_REQUEST"
684
            )
685

686
        result = self.forward_request(context)
1✔
687

688
        table_description = result["TableDescription"]
1✔
689
        table_description["TableArn"] = table_arn = self.fix_table_arn(
1✔
690
            context.account_id, context.region, table_description["TableArn"]
691
        )
692

693
        backend = get_store(context.account_id, context.region)
1✔
694
        backend.table_definitions[table_name] = table_definitions = dict(create_table_input)
1✔
695
        backend.TABLE_REGION[table_name] = context.region
1✔
696

697
        if "TableId" not in table_definitions:
1✔
698
            table_definitions["TableId"] = long_uid()
1✔
699

700
        if "SSESpecification" in table_definitions:
1✔
701
            sse_specification = table_definitions.pop("SSESpecification")
1✔
702
            table_definitions["SSEDescription"] = SSEUtils.get_sse_description(
1✔
703
                context.account_id, context.region, sse_specification
704
            )
705

706
        if table_definitions:
1✔
707
            table_content = result.get("Table", {})
1✔
708
            table_content.update(table_definitions)
1✔
709
            table_description.update(table_content)
1✔
710

711
        if "StreamSpecification" in table_definitions:
1✔
712
            create_dynamodb_stream(
1✔
713
                context.account_id,
714
                context.region,
715
                table_definitions,
716
                table_description.get("LatestStreamLabel"),
717
            )
718

719
        if "TableClass" in table_definitions:
1✔
720
            table_class = table_description.pop("TableClass", None) or table_definitions.pop(
1✔
721
                "TableClass"
722
            )
723
            table_description["TableClassSummary"] = {"TableClass": table_class}
1✔
724

725
        if "GlobalSecondaryIndexes" in table_description:
1✔
726
            gsis = copy.deepcopy(table_description["GlobalSecondaryIndexes"])
1✔
727
            # update the different values, as DynamoDB-local v2 has a regression around GSI and does not return anything
728
            # anymore
729
            for gsi in gsis:
1✔
730
                index_name = gsi.get("IndexName", "")
1✔
731
                gsi.update(
1✔
732
                    {
733
                        "IndexArn": f"{table_arn}/index/{index_name}",
734
                        "IndexSizeBytes": 0,
735
                        "IndexStatus": "ACTIVE",
736
                        "ItemCount": 0,
737
                    }
738
                )
739
                gsi_provisioned_throughput = gsi.setdefault("ProvisionedThroughput", {})
1✔
740
                gsi_provisioned_throughput["NumberOfDecreasesToday"] = 0
1✔
741

742
                if billing_mode == BillingMode.PAY_PER_REQUEST:
1✔
743
                    gsi_provisioned_throughput["ReadCapacityUnits"] = 0
1✔
744
                    gsi_provisioned_throughput["WriteCapacityUnits"] = 0
1✔
745

746
            table_description["GlobalSecondaryIndexes"] = gsis
1✔
747

748
        if "ProvisionedThroughput" in table_description:
1✔
749
            if "NumberOfDecreasesToday" not in table_description["ProvisionedThroughput"]:
1✔
750
                table_description["ProvisionedThroughput"]["NumberOfDecreasesToday"] = 0
1✔
751

752
        if "WarmThroughput" in table_description:
1✔
753
            table_description["WarmThroughput"]["Status"] = "UPDATING"
1✔
754

755
        tags = table_definitions.pop("Tags", [])
1✔
756
        if tags:
1✔
757
            get_store(context.account_id, context.region).TABLE_TAGS[table_arn] = {
1✔
758
                tag["Key"]: tag["Value"] for tag in tags
759
            }
760

761
        # remove invalid attributes from result
762
        table_description.pop("Tags", None)
1✔
763
        table_description.pop("BillingMode", None)
1✔
764

765
        return result
1✔
766

767
    def delete_table(
1✔
768
        self, context: RequestContext, table_name: TableName, **kwargs
769
    ) -> DeleteTableOutput:
770
        global_table_region = self.get_global_table_region(context, table_name)
1✔
771

772
        self.ensure_table_exists(
1✔
773
            context.account_id,
774
            global_table_region,
775
            table_name,
776
            error_message=f"Requested resource not found: Table: {table_name} not found",
777
        )
778

779
        # Limitation note: On AWS, for a replicated table, if the source table is deleted, the replicated tables continue to exist.
780
        # This is not the case for LocalStack, where all replicated tables will also be removed if source is deleted.
781

782
        result = self._forward_request(context=context, region=global_table_region)
1✔
783

784
        table_arn = result.get("TableDescription", {}).get("TableArn")
1✔
785
        table_arn = self.fix_table_arn(context.account_id, context.region, table_arn)
1✔
786
        dynamodbstreams_api.delete_streams(context.account_id, context.region, table_arn)
1✔
787

788
        store = get_store(context.account_id, context.region)
1✔
789
        store.TABLE_TAGS.pop(table_arn, None)
1✔
790
        store.REPLICAS.pop(table_name, None)
1✔
791

792
        return result
1✔
793

794
    def describe_table(
1✔
795
        self, context: RequestContext, table_name: TableName, **kwargs
796
    ) -> DescribeTableOutput:
797
        global_table_region = self.get_global_table_region(context, table_name)
1✔
798

799
        result = self._forward_request(context=context, region=global_table_region)
1✔
800
        table_description: TableDescription = result["Table"]
1✔
801

802
        # Update table properties from LocalStack stores
803
        if table_props := get_store(context.account_id, context.region).table_properties.get(
1✔
804
            table_name
805
        ):
806
            table_description.update(table_props)
1✔
807

808
        store = get_store(context.account_id, context.region)
1✔
809

810
        # Update replication details
811
        replicas: dict[RegionName, ReplicaDescription] = store.REPLICAS.get(table_name, {})
1✔
812

813
        replica_description_list = []
1✔
814

815
        if global_table_region != context.region:
1✔
816
            replica_description_list.append(
1✔
817
                ReplicaDescription(
818
                    RegionName=global_table_region, ReplicaStatus=ReplicaStatus.ACTIVE
819
                )
820
            )
821

822
        for replica_region, replica_description in replicas.items():
1✔
823
            # The replica in the region being queried must not be returned
824
            if replica_region != context.region:
1✔
825
                replica_description_list.append(replica_description)
1✔
826

827
        if replica_description_list:
1✔
828
            table_description.update({"Replicas": replica_description_list})
1✔
829

830
        # update only TableId and SSEDescription if present
831
        if table_definitions := store.table_definitions.get(table_name):
1✔
832
            for key in ["TableId", "SSEDescription"]:
1✔
833
                if table_definitions.get(key):
1✔
834
                    table_description[key] = table_definitions[key]
1✔
835
            if "TableClass" in table_definitions:
1✔
836
                table_description["TableClassSummary"] = {
1✔
837
                    "TableClass": table_definitions["TableClass"]
838
                }
839
            if warm_throughput := table_definitions.get("WarmThroughput"):
1✔
840
                table_description["WarmThroughput"] = warm_throughput.copy()
1✔
841
                table_description["WarmThroughput"].setdefault("Status", "ACTIVE")
1✔
842

843
        if "GlobalSecondaryIndexes" in table_description:
1✔
844
            for gsi in table_description["GlobalSecondaryIndexes"]:
1✔
845
                default_values = {
1✔
846
                    "NumberOfDecreasesToday": 0,
847
                    "ReadCapacityUnits": 0,
848
                    "WriteCapacityUnits": 0,
849
                }
850
                # even if the billing mode is PAY_PER_REQUEST, AWS returns the Read and Write Capacity Units
851
                # Terraform depends on this parity for update operations
852
                gsi["ProvisionedThroughput"] = default_values | gsi.get("ProvisionedThroughput", {})
1✔
853

854
        # Set defaults for warm throughput
855
        if "WarmThroughput" not in table_description:
1✔
856
            billing_mode = table_definitions.get("BillingMode") if table_definitions else None
1✔
857
            table_description["WarmThroughput"] = {
1✔
858
                "ReadUnitsPerSecond": 12000 if billing_mode == "PAY_PER_REQUEST" else 5,
859
                "WriteUnitsPerSecond": 4000 if billing_mode == "PAY_PER_REQUEST" else 5,
860
            }
861
        table_description["WarmThroughput"]["Status"] = (
1✔
862
            table_description.get("TableStatus") or "ACTIVE"
863
        )
864

865
        return DescribeTableOutput(
1✔
866
            Table=select_from_typed_dict(TableDescription, table_description)
867
        )
868

869
    @handler("UpdateTable", expand=False)
1✔
870
    def update_table(
1✔
871
        self, context: RequestContext, update_table_input: UpdateTableInput
872
    ) -> UpdateTableOutput:
873
        table_name = update_table_input["TableName"]
1✔
874
        global_table_region = self.get_global_table_region(context, table_name)
1✔
875

876
        try:
1✔
877
            result = self._forward_request(context=context, region=global_table_region)
1✔
878
        except CommonServiceException as exc:
1✔
879
            # DynamoDBLocal refuses to update certain table params and raises.
880
            # But we still need to update this info in LocalStack stores
881
            if not (exc.code == "ValidationException" and exc.message == "Nothing to update"):
1✔
882
                raise
×
883

884
            if table_class := update_table_input.get("TableClass"):
1✔
885
                table_definitions = get_store(
1✔
886
                    context.account_id, context.region
887
                ).table_definitions.setdefault(table_name, {})
888
                table_definitions["TableClass"] = table_class
1✔
889

890
            if replica_updates := update_table_input.get("ReplicaUpdates"):
1✔
891
                store = get_store(context.account_id, global_table_region)
1✔
892

893
                # Dict with source region to set of replicated regions
894
                replicas: dict[RegionName, ReplicaDescription] = store.REPLICAS.get(table_name, {})
1✔
895

896
                for replica_update in replica_updates:
1✔
897
                    for key, details in replica_update.items():
1✔
898
                        # Replicated region
899
                        target_region = details.get("RegionName")
1✔
900

901
                        # Check if replicated region is valid
902
                        if target_region not in get_valid_regions_for_service("dynamodb"):
1✔
903
                            raise ValidationException(f"Region {target_region} is not supported")
×
904

905
                        match key:
1✔
906
                            case "Create":
1✔
907
                                if target_region in replicas:
1✔
908
                                    raise ValidationException(
×
909
                                        f"Failed to create a the new replica of table with name: '{table_name}' because one or more replicas already existed as tables."
910
                                    )
911
                                replicas[target_region] = ReplicaDescription(
1✔
912
                                    RegionName=target_region,
913
                                    KMSMasterKeyId=details.get("KMSMasterKeyId"),
914
                                    ProvisionedThroughputOverride=details.get(
915
                                        "ProvisionedThroughputOverride"
916
                                    ),
917
                                    GlobalSecondaryIndexes=details.get("GlobalSecondaryIndexes"),
918
                                    ReplicaStatus=ReplicaStatus.ACTIVE,
919
                                )
920
                            case "Delete":
1✔
921
                                try:
1✔
922
                                    replicas.pop(target_region)
1✔
923
                                except KeyError:
1✔
924
                                    raise ValidationException(
1✔
925
                                        "Update global table operation failed because one or more replicas were not part of the global table."
926
                                    )
927

928
                store.REPLICAS[table_name] = replicas
1✔
929

930
            # update response content
931
            SchemaExtractor.invalidate_table_schema(
1✔
932
                table_name, context.account_id, global_table_region
933
            )
934

935
            schema = SchemaExtractor.get_table_schema(
1✔
936
                table_name, context.account_id, global_table_region
937
            )
938

939
            if sse_specification_input := update_table_input.get("SSESpecification"):
1✔
940
                # If SSESpecification is changed, update store and return the 'UPDATING' status in the response
941
                table_definition = get_store(
1✔
942
                    context.account_id, context.region
943
                ).table_definitions.setdefault(table_name, {})
944
                if not sse_specification_input["Enabled"]:
1✔
945
                    table_definition.pop("SSEDescription", None)
1✔
946
                    schema["Table"]["SSEDescription"]["Status"] = "UPDATING"
1✔
947

948
            return UpdateTableOutput(TableDescription=schema["Table"])
1✔
949

950
        SchemaExtractor.invalidate_table_schema(table_name, context.account_id, global_table_region)
1✔
951

952
        schema = SchemaExtractor.get_table_schema(
1✔
953
            table_name, context.account_id, global_table_region
954
        )
955

956
        # TODO: DDB streams must also be created for replicas
957
        if update_table_input.get("StreamSpecification"):
1✔
958
            create_dynamodb_stream(
1✔
959
                context.account_id,
960
                context.region,
961
                update_table_input,
962
                result["TableDescription"].get("LatestStreamLabel"),
963
            )
964

965
        return UpdateTableOutput(TableDescription=schema["Table"])
1✔
966

967
    def list_tables(
1✔
968
        self,
969
        context: RequestContext,
970
        exclusive_start_table_name: TableName = None,
971
        limit: ListTablesInputLimit = None,
972
        **kwargs,
973
    ) -> ListTablesOutput:
974
        response = self.forward_request(context)
1✔
975

976
        # Add replicated tables
977
        replicas = get_store(context.account_id, context.region).REPLICAS
1✔
978
        for replicated_table, replications in replicas.items():
1✔
979
            for replica_region, replica_description in replications.items():
1✔
980
                if context.region == replica_region:
1✔
981
                    response["TableNames"].append(replicated_table)
1✔
982

983
        return response
1✔
984

985
    #
986
    # Contributor Insights
987
    #
988

989
    @handler("DescribeContributorInsights", expand=False)
1✔
990
    def describe_contributor_insights(
1✔
991
        self,
992
        context: RequestContext,
993
        describe_contributor_insights_input: DescribeContributorInsightsInput,
994
    ) -> DescribeContributorInsightsOutput:
995
        return DescribeContributorInsightsOutput(
1✔
996
            TableName=describe_contributor_insights_input["TableName"],
997
            IndexName=describe_contributor_insights_input.get("IndexName"),
998
            ContributorInsightsStatus="DISABLED",
999
        )
1000

1001
    #
1002
    # Item ops
1003
    #
1004

1005
    @handler("PutItem", expand=False)
1✔
1006
    def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:
1✔
1007
        table_name = put_item_input["TableName"]
1✔
1008
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1009

1010
        has_return_values = put_item_input.get("ReturnValues") == "ALL_OLD"
1✔
1011
        stream_type = get_table_stream_type(context.account_id, context.region, table_name)
1✔
1012

1013
        # if the request doesn't ask for ReturnValues and we have stream enabled, we need to modify the request to
1014
        # force DDBLocal to return those values
1015
        if stream_type and not has_return_values:
1✔
1016
            service_req = copy.copy(context.service_request)
1✔
1017
            service_req["ReturnValues"] = "ALL_OLD"
1✔
1018
            result = self._forward_request(
1✔
1019
                context=context, region=global_table_region, service_request=service_req
1020
            )
1021
        else:
1022
            result = self._forward_request(context=context, region=global_table_region)
1✔
1023

1024
        # Since this operation makes use of global table region, we need to use the same region for all
1025
        # calls made via the inter-service client. This is taken care of by passing the account ID and
1026
        # region, e.g. when getting the stream spec
1027

1028
        # Get stream specifications details for the table
1029
        if stream_type:
1✔
1030
            item = put_item_input["Item"]
1✔
1031
            # prepare record keys
1032
            keys = SchemaExtractor.extract_keys(
1✔
1033
                item=item,
1034
                table_name=table_name,
1035
                account_id=context.account_id,
1036
                region_name=global_table_region,
1037
            )
1038
            # because we modified the request, we will always have the ReturnValues if we have streams enabled
1039
            if has_return_values:
1✔
1040
                existing_item = result.get("Attributes")
×
1041
            else:
1042
                # remove the ReturnValues if the client didn't ask for it
1043
                existing_item = result.pop("Attributes", None)
1✔
1044

1045
            if existing_item == item:
1✔
1046
                return result
1✔
1047

1048
            # create record
1049
            record = self.get_record_template(
1✔
1050
                context.region,
1051
            )
1052
            record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
1053
            record["dynamodb"]["Keys"] = keys
1✔
1054
            record["dynamodb"]["SizeBytes"] = _get_size_bytes(item)
1✔
1055

1056
            if stream_type.needs_new_image:
1✔
1057
                record["dynamodb"]["NewImage"] = item
1✔
1058
            if stream_type.stream_view_type:
1✔
1059
                record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
1060
            if existing_item and stream_type.needs_old_image:
1✔
1061
                record["dynamodb"]["OldImage"] = existing_item
1✔
1062

1063
            records_map = {
1✔
1064
                table_name: TableRecords(records=[record], table_stream_type=stream_type)
1065
            }
1066
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1067
        return result
1✔
1068

1069
    @handler("DeleteItem", expand=False)
1✔
1070
    def delete_item(
1✔
1071
        self,
1072
        context: RequestContext,
1073
        delete_item_input: DeleteItemInput,
1074
    ) -> DeleteItemOutput:
1075
        table_name = delete_item_input["TableName"]
1✔
1076
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1077

1078
        has_return_values = delete_item_input.get("ReturnValues") == "ALL_OLD"
1✔
1079
        stream_type = get_table_stream_type(context.account_id, context.region, table_name)
1✔
1080

1081
        # if the request doesn't ask for ReturnValues and we have stream enabled, we need to modify the request to
1082
        # force DDBLocal to return those values
1083
        if stream_type and not has_return_values:
1✔
1084
            service_req = copy.copy(context.service_request)
1✔
1085
            service_req["ReturnValues"] = "ALL_OLD"
1✔
1086
            result = self._forward_request(
1✔
1087
                context=context, region=global_table_region, service_request=service_req
1088
            )
1089
        else:
1090
            result = self._forward_request(context=context, region=global_table_region)
1✔
1091

1092
        # determine and forward stream record
1093
        if stream_type:
1✔
1094
            # because we modified the request, we will always have the ReturnValues if we have streams enabled
1095
            if has_return_values:
1✔
1096
                existing_item = result.get("Attributes")
×
1097
            else:
1098
                # remove the ReturnValues if the client didn't ask for it
1099
                existing_item = result.pop("Attributes", None)
1✔
1100

1101
            if not existing_item:
1✔
1102
                return result
×
1103

1104
            # create record
1105
            record = self.get_record_template(context.region)
1✔
1106
            record["eventName"] = "REMOVE"
1✔
1107
            record["dynamodb"]["Keys"] = delete_item_input["Key"]
1✔
1108
            record["dynamodb"]["SizeBytes"] = _get_size_bytes(existing_item)
1✔
1109

1110
            if stream_type.stream_view_type:
1✔
1111
                record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
1112
            if stream_type.needs_old_image:
1✔
1113
                record["dynamodb"]["OldImage"] = existing_item
1✔
1114

1115
            records_map = {
1✔
1116
                table_name: TableRecords(records=[record], table_stream_type=stream_type)
1117
            }
1118
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1119

1120
        return result
1✔
1121

1122
    @handler("UpdateItem", expand=False)
1✔
1123
    def update_item(
1✔
1124
        self,
1125
        context: RequestContext,
1126
        update_item_input: UpdateItemInput,
1127
    ) -> UpdateItemOutput:
1128
        # TODO: UpdateItem is harder to use ReturnValues for Streams, because it needs the Before and After images.
1129
        table_name = update_item_input["TableName"]
1✔
1130
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1131

1132
        existing_item = None
1✔
1133
        stream_type = get_table_stream_type(context.account_id, context.region, table_name)
1✔
1134

1135
        # even if we don't need the OldImage, we still need to fetch the existing item to know if the event is INSERT
1136
        # or MODIFY (UpdateItem will create the object if it doesn't exist, and you don't use a ConditionExpression)
1137
        if stream_type:
1✔
1138
            existing_item = ItemFinder.find_existing_item(
1✔
1139
                put_item=update_item_input,
1140
                table_name=table_name,
1141
                account_id=context.account_id,
1142
                region_name=context.region,
1143
                endpoint_url=self.server.url,
1144
            )
1145

1146
        result = self._forward_request(context=context, region=global_table_region)
1✔
1147

1148
        # construct and forward stream record
1149
        if stream_type:
1✔
1150
            updated_item = ItemFinder.find_existing_item(
1✔
1151
                put_item=update_item_input,
1152
                table_name=table_name,
1153
                account_id=context.account_id,
1154
                region_name=context.region,
1155
                endpoint_url=self.server.url,
1156
            )
1157
            if not updated_item or updated_item == existing_item:
1✔
1158
                return result
×
1159

1160
            record = self.get_record_template(context.region)
1✔
1161
            record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
1162
            record["dynamodb"]["Keys"] = update_item_input["Key"]
1✔
1163
            record["dynamodb"]["SizeBytes"] = _get_size_bytes(updated_item)
1✔
1164

1165
            if stream_type.stream_view_type:
1✔
1166
                record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
1167
            if existing_item and stream_type.needs_old_image:
1✔
1168
                record["dynamodb"]["OldImage"] = existing_item
1✔
1169
            if stream_type.needs_new_image:
1✔
1170
                record["dynamodb"]["NewImage"] = updated_item
1✔
1171

1172
            records_map = {
1✔
1173
                table_name: TableRecords(records=[record], table_stream_type=stream_type)
1174
            }
1175
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1176

1177
        return result
1✔
1178

1179
    @handler("GetItem", expand=False)
1✔
1180
    def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:
1✔
1181
        table_name = get_item_input["TableName"]
1✔
1182
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1183
        result = self._forward_request(context=context, region=global_table_region)
1✔
1184
        self.fix_consumed_capacity(get_item_input, result)
1✔
1185
        return result
1✔
1186

1187
    #
1188
    # Queries
1189
    #
1190

1191
    @handler("Query", expand=False)
1✔
1192
    def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:
1✔
1193
        index_name = query_input.get("IndexName")
1✔
1194
        if index_name:
1✔
1195
            if not is_index_query_valid(context.account_id, context.region, query_input):
1✔
1196
                raise ValidationException(
1✔
1197
                    "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "
1198
                    "is not supported for global secondary index id-index because its projection "
1199
                    "type is not ALL",
1200
                )
1201

1202
        table_name = query_input["TableName"]
1✔
1203
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1204
        result = self._forward_request(context=context, region=global_table_region)
1✔
1205
        self.fix_consumed_capacity(query_input, result)
1✔
1206
        return result
1✔
1207

1208
    @handler("Scan", expand=False)
1✔
1209
    def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:
1✔
1210
        table_name = scan_input["TableName"]
1✔
1211
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1212
        result = self._forward_request(context=context, region=global_table_region)
1✔
1213
        return result
1✔
1214

1215
    #
1216
    # Batch ops
1217
    #
1218

1219
    @handler("BatchWriteItem", expand=False)
1✔
1220
    def batch_write_item(
1✔
1221
        self,
1222
        context: RequestContext,
1223
        batch_write_item_input: BatchWriteItemInput,
1224
    ) -> BatchWriteItemOutput:
1225
        # TODO: add global table support
1226
        existing_items = {}
1✔
1227
        existing_items_to_fetch: BatchWriteItemRequestMap = {}
1✔
1228
        # UnprocessedItems should have the same format as RequestItems
1229
        unprocessed_items = {}
1✔
1230
        request_items = batch_write_item_input["RequestItems"]
1✔
1231

1232
        tables_stream_type: dict[TableName, TableStreamType] = {}
1✔
1233

1234
        for table_name, items in sorted(request_items.items(), key=itemgetter(0)):
1✔
1235
            if stream_type := get_table_stream_type(context.account_id, context.region, table_name):
1✔
1236
                tables_stream_type[table_name] = stream_type
1✔
1237

1238
            for request in items:
1✔
1239
                request: WriteRequest
1240
                for key, inner_request in request.items():
1✔
1241
                    inner_request: PutRequest | DeleteRequest
1242
                    if self.should_throttle("BatchWriteItem"):
1✔
1243
                        unprocessed_items_for_table = unprocessed_items.setdefault(table_name, [])
×
1244
                        unprocessed_items_for_table.append(request)
×
1245

1246
                    elif stream_type:
1✔
1247
                        existing_items_to_fetch_for_table = existing_items_to_fetch.setdefault(
1✔
1248
                            table_name, []
1249
                        )
1250
                        existing_items_to_fetch_for_table.append(inner_request)
1✔
1251

1252
        if existing_items_to_fetch:
1✔
1253
            existing_items = ItemFinder.find_existing_items(
1✔
1254
                put_items_per_table=existing_items_to_fetch,
1255
                account_id=context.account_id,
1256
                region_name=context.region,
1257
                endpoint_url=self.server.url,
1258
            )
1259

1260
        try:
1✔
1261
            result = self.forward_request(context)
1✔
1262
        except CommonServiceException as e:
1✔
1263
            # TODO: validate if DynamoDB still raises `One of the required keys was not given a value`
1264
            # for now, replace with the schema error validation
1265
            if e.message == "One of the required keys was not given a value":
1✔
1266
                raise ValidationException("The provided key element does not match the schema")
1✔
1267
            raise e
×
1268

1269
        # determine and forward stream records
1270
        if tables_stream_type:
1✔
1271
            records_map = self.prepare_batch_write_item_records(
1✔
1272
                account_id=context.account_id,
1273
                region_name=context.region,
1274
                tables_stream_type=tables_stream_type,
1275
                request_items=request_items,
1276
                existing_items=existing_items,
1277
            )
1278
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1279

1280
        # TODO: should unprocessed item which have mutated by `prepare_batch_write_item_records` be returned
1281
        for table_name, unprocessed_items_in_table in unprocessed_items.items():
1✔
1282
            unprocessed: dict = result["UnprocessedItems"]
×
1283
            result_unprocessed_table = unprocessed.setdefault(table_name, [])
×
1284

1285
            # add the Unprocessed items to the response
1286
            # TODO: check before if the same request has not been Unprocessed by DDB local already?
1287
            # those might actually have been processed? shouldn't we remove them from the proxied request?
1288
            for request in unprocessed_items_in_table:
×
1289
                result_unprocessed_table.append(request)
×
1290

1291
            # remove any table entry if it's empty
1292
            result["UnprocessedItems"] = {k: v for k, v in unprocessed.items() if v}
×
1293

1294
        return result
1✔
1295

1296
    @handler("BatchGetItem")
1✔
1297
    def batch_get_item(
1✔
1298
        self,
1299
        context: RequestContext,
1300
        request_items: BatchGetRequestMap,
1301
        return_consumed_capacity: ReturnConsumedCapacity = None,
1302
        **kwargs,
1303
    ) -> BatchGetItemOutput:
1304
        # TODO: add global table support
1305
        return self.forward_request(context)
1✔
1306

1307
    #
1308
    # Transactions
1309
    #
1310

1311
    @handler("TransactWriteItems", expand=False)
1✔
1312
    def transact_write_items(
1✔
1313
        self,
1314
        context: RequestContext,
1315
        transact_write_items_input: TransactWriteItemsInput,
1316
    ) -> TransactWriteItemsOutput:
1317
        # TODO: add global table support
1318
        existing_items = {}
1✔
1319
        existing_items_to_fetch: dict[str, list[Put | Update | Delete]] = {}
1✔
1320
        updated_items_to_fetch: dict[str, list[Update]] = {}
1✔
1321
        transact_items = transact_write_items_input["TransactItems"]
1✔
1322
        tables_stream_type: dict[TableName, TableStreamType] = {}
1✔
1323
        no_stream_tables = set()
1✔
1324

1325
        for item in transact_items:
1✔
1326
            item: TransactWriteItem
1327
            for key in ["Put", "Update", "Delete"]:
1✔
1328
                inner_item: Put | Delete | Update = item.get(key)
1✔
1329
                if inner_item:
1✔
1330
                    table_name = inner_item["TableName"]
1✔
1331
                    # if we've seen the table already and it does not have streams, skip
1332
                    if table_name in no_stream_tables:
1✔
1333
                        continue
1✔
1334

1335
                    # if we have not seen the table, fetch its streaming status
1336
                    if table_name not in tables_stream_type:
1✔
1337
                        if stream_type := get_table_stream_type(
1✔
1338
                            context.account_id, context.region, table_name
1339
                        ):
1340
                            tables_stream_type[table_name] = stream_type
1✔
1341
                        else:
1342
                            # no stream,
1343
                            no_stream_tables.add(table_name)
1✔
1344
                            continue
1✔
1345

1346
                    existing_items_to_fetch_for_table = existing_items_to_fetch.setdefault(
1✔
1347
                        table_name, []
1348
                    )
1349
                    existing_items_to_fetch_for_table.append(inner_item)
1✔
1350
                    if key == "Update":
1✔
1351
                        updated_items_to_fetch_for_table = updated_items_to_fetch.setdefault(
1✔
1352
                            table_name, []
1353
                        )
1354
                        updated_items_to_fetch_for_table.append(inner_item)
1✔
1355

1356
                    continue
1✔
1357

1358
        if existing_items_to_fetch:
1✔
1359
            existing_items = ItemFinder.find_existing_items(
1✔
1360
                put_items_per_table=existing_items_to_fetch,
1361
                account_id=context.account_id,
1362
                region_name=context.region,
1363
                endpoint_url=self.server.url,
1364
            )
1365

1366
        client_token: str | None = transact_write_items_input.get("ClientRequestToken")
1✔
1367

1368
        if client_token:
1✔
1369
            # we sort the payload since identical payload but with different order could cause
1370
            # IdempotentParameterMismatchException error if a client token is provided
1371
            context.request.data = to_bytes(canonical_json(json.loads(context.request.data)))
1✔
1372

1373
        result = self.forward_request(context)
1✔
1374

1375
        # determine and forward stream records
1376
        if tables_stream_type:
1✔
1377
            updated_items = (
1✔
1378
                ItemFinder.find_existing_items(
1379
                    put_items_per_table=existing_items_to_fetch,
1380
                    account_id=context.account_id,
1381
                    region_name=context.region,
1382
                    endpoint_url=self.server.url,
1383
                )
1384
                if updated_items_to_fetch
1385
                else {}
1386
            )
1387

1388
            records_map = self.prepare_transact_write_item_records(
1✔
1389
                account_id=context.account_id,
1390
                region_name=context.region,
1391
                transact_items=transact_items,
1392
                existing_items=existing_items,
1393
                updated_items=updated_items,
1394
                tables_stream_type=tables_stream_type,
1395
            )
1396
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1397

1398
        return result
1✔
1399

1400
    @handler("TransactGetItems", expand=False)
1✔
1401
    def transact_get_items(
1✔
1402
        self,
1403
        context: RequestContext,
1404
        transact_items: TransactGetItemList,
1405
        return_consumed_capacity: ReturnConsumedCapacity = None,
1406
    ) -> TransactGetItemsOutput:
1407
        return self.forward_request(context)
1✔
1408

1409
    @handler("ExecuteTransaction", expand=False)
1✔
1410
    def execute_transaction(
1✔
1411
        self, context: RequestContext, execute_transaction_input: ExecuteTransactionInput
1412
    ) -> ExecuteTransactionOutput:
1413
        result = self.forward_request(context)
1✔
1414
        return result
1✔
1415

1416
    @handler("ExecuteStatement", expand=False)
1✔
1417
    def execute_statement(
1✔
1418
        self,
1419
        context: RequestContext,
1420
        execute_statement_input: ExecuteStatementInput,
1421
    ) -> ExecuteStatementOutput:
1422
        # TODO: this operation is still really slow with streams enabled
1423
        #  find a way to make it better, same way as the other operations, by using returnvalues
1424
        # see https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.update.html
1425
        statement = execute_statement_input["Statement"]
1✔
1426
        # We found out that 'Parameters' can be an empty list when the request comes from the AWS JS client.
1427
        if execute_statement_input.get("Parameters", None) == []:  # noqa
1✔
1428
            raise ValidationException(
1✔
1429
                "1 validation error detected: Value '[]' at 'parameters' failed to satisfy constraint: Member must have length greater than or equal to 1"
1430
            )
1431
        table_name = extract_table_name_from_partiql_update(statement)
1✔
1432
        existing_items = None
1✔
1433
        stream_type = table_name and get_table_stream_type(
1✔
1434
            context.account_id, context.region, table_name
1435
        )
1436
        if stream_type:
1✔
1437
            # Note: fetching the entire list of items is hugely inefficient, especially for larger tables
1438
            # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!
1439
            existing_items = ItemFinder.list_existing_items_for_statement(
1✔
1440
                partiql_statement=statement,
1441
                account_id=context.account_id,
1442
                region_name=context.region,
1443
                endpoint_url=self.server.url,
1444
            )
1445

1446
        result = self.forward_request(context)
1✔
1447

1448
        # construct and forward stream record
1449
        if stream_type:
1✔
1450
            records = get_updated_records(
1✔
1451
                account_id=context.account_id,
1452
                region_name=context.region,
1453
                table_name=table_name,
1454
                existing_items=existing_items,
1455
                server_url=self.server.url,
1456
                table_stream_type=stream_type,
1457
            )
1458
            self.forward_stream_records(context.account_id, context.region, records)
1✔
1459

1460
        return result
1✔
1461

1462
    #
1463
    # Tags
1464
    #
1465

1466
    def tag_resource(
1✔
1467
        self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList, **kwargs
1468
    ) -> None:
1469
        table_tags = get_store(context.account_id, context.region).TABLE_TAGS
1✔
1470
        if resource_arn not in table_tags:
1✔
1471
            table_tags[resource_arn] = {}
×
1472
        table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})
1✔
1473

1474
    def untag_resource(
1✔
1475
        self,
1476
        context: RequestContext,
1477
        resource_arn: ResourceArnString,
1478
        tag_keys: TagKeyList,
1479
        **kwargs,
1480
    ) -> None:
1481
        for tag_key in tag_keys or []:
1✔
1482
            get_store(context.account_id, context.region).TABLE_TAGS.get(resource_arn, {}).pop(
1✔
1483
                tag_key, None
1484
            )
1485

1486
    def list_tags_of_resource(
1✔
1487
        self,
1488
        context: RequestContext,
1489
        resource_arn: ResourceArnString,
1490
        next_token: NextTokenString = None,
1491
        **kwargs,
1492
    ) -> ListTagsOfResourceOutput:
1493
        result = [
1✔
1494
            {"Key": k, "Value": v}
1495
            for k, v in get_store(context.account_id, context.region)
1496
            .TABLE_TAGS.get(resource_arn, {})
1497
            .items()
1498
        ]
1499
        return ListTagsOfResourceOutput(Tags=result)
1✔
1500

1501
    #
1502
    # TTLs
1503
    #
1504

1505
    def describe_time_to_live(
1✔
1506
        self, context: RequestContext, table_name: TableName, **kwargs
1507
    ) -> DescribeTimeToLiveOutput:
1508
        if not self.table_exists(context.account_id, context.region, table_name):
1✔
1509
            raise ResourceNotFoundException(
1✔
1510
                f"Requested resource not found: Table: {table_name} not found"
1511
            )
1512

1513
        backend = get_store(context.account_id, context.region)
1✔
1514
        ttl_spec = backend.ttl_specifications.get(table_name)
1✔
1515

1516
        result = {"TimeToLiveStatus": "DISABLED"}
1✔
1517
        if ttl_spec:
1✔
1518
            if ttl_spec.get("Enabled"):
1✔
1519
                ttl_status = "ENABLED"
1✔
1520
            else:
1521
                ttl_status = "DISABLED"
1✔
1522
            result = {
1✔
1523
                "AttributeName": ttl_spec.get("AttributeName"),
1524
                "TimeToLiveStatus": ttl_status,
1525
            }
1526

1527
        return DescribeTimeToLiveOutput(TimeToLiveDescription=result)
1✔
1528

1529
    def update_time_to_live(
1✔
1530
        self,
1531
        context: RequestContext,
1532
        table_name: TableName,
1533
        time_to_live_specification: TimeToLiveSpecification,
1534
        **kwargs,
1535
    ) -> UpdateTimeToLiveOutput:
1536
        if not self.table_exists(context.account_id, context.region, table_name):
1✔
1537
            raise ResourceNotFoundException(
1✔
1538
                f"Requested resource not found: Table: {table_name} not found"
1539
            )
1540

1541
        # TODO: TTL status is maintained/mocked but no real expiry is happening for items
1542
        backend = get_store(context.account_id, context.region)
1✔
1543
        backend.ttl_specifications[table_name] = time_to_live_specification
1✔
1544
        return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)
1✔
1545

1546
    #
1547
    # Global tables
1548
    #
1549

1550
    def create_global_table(
1✔
1551
        self,
1552
        context: RequestContext,
1553
        global_table_name: TableName,
1554
        replication_group: ReplicaList,
1555
        **kwargs,
1556
    ) -> CreateGlobalTableOutput:
1557
        global_tables: dict = get_store(context.account_id, context.region).GLOBAL_TABLES
1✔
1558
        if global_table_name in global_tables:
1✔
1559
            raise GlobalTableAlreadyExistsException("Global table with this name already exists")
1✔
1560
        replication_group = [grp.copy() for grp in replication_group or []]
1✔
1561
        data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}
1✔
1562
        global_tables[global_table_name] = data
1✔
1563
        for group in replication_group:
1✔
1564
            group["ReplicaStatus"] = "ACTIVE"
1✔
1565
            group["ReplicaStatusDescription"] = "Replica active"
1✔
1566
        return CreateGlobalTableOutput(GlobalTableDescription=data)
1✔
1567

1568
    def describe_global_table(
1✔
1569
        self, context: RequestContext, global_table_name: TableName, **kwargs
1570
    ) -> DescribeGlobalTableOutput:
1571
        details = get_store(context.account_id, context.region).GLOBAL_TABLES.get(global_table_name)
1✔
1572
        if not details:
1✔
1573
            raise GlobalTableNotFoundException("Global table with this name does not exist")
1✔
1574
        return DescribeGlobalTableOutput(GlobalTableDescription=details)
1✔
1575

1576
    def list_global_tables(
1✔
1577
        self,
1578
        context: RequestContext,
1579
        exclusive_start_global_table_name: TableName = None,
1580
        limit: PositiveIntegerObject = None,
1581
        region_name: RegionName = None,
1582
        **kwargs,
1583
    ) -> ListGlobalTablesOutput:
1584
        # TODO: add paging support
1585
        result = [
×
1586
            select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])
1587
            for tab in get_store(context.account_id, context.region).GLOBAL_TABLES.values()
1588
        ]
1589
        return ListGlobalTablesOutput(GlobalTables=result)
×
1590

1591
    def update_global_table(
1✔
1592
        self,
1593
        context: RequestContext,
1594
        global_table_name: TableName,
1595
        replica_updates: ReplicaUpdateList,
1596
        **kwargs,
1597
    ) -> UpdateGlobalTableOutput:
1598
        details = get_store(context.account_id, context.region).GLOBAL_TABLES.get(global_table_name)
1✔
1599
        if not details:
1✔
1600
            raise GlobalTableNotFoundException("Global table with this name does not exist")
×
1601
        for update in replica_updates or []:
1✔
1602
            repl_group = details["ReplicationGroup"]
1✔
1603
            # delete existing
1604
            delete = update.get("Delete")
1✔
1605
            if delete:
1✔
1606
                details["ReplicationGroup"] = [
1✔
1607
                    g for g in repl_group if g["RegionName"] != delete["RegionName"]
1608
                ]
1609
            # create new
1610
            create = update.get("Create")
1✔
1611
            if create:
1✔
1612
                exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]
1✔
1613
                if exists:
1✔
1614
                    continue
×
1615
                new_group = {
1✔
1616
                    "RegionName": create["RegionName"],
1617
                    "ReplicaStatus": "ACTIVE",
1618
                    "ReplicaStatusDescription": "Replica active",
1619
                }
1620
                details["ReplicationGroup"].append(new_group)
1✔
1621
        return UpdateGlobalTableOutput(GlobalTableDescription=details)
1✔
1622

1623
    #
1624
    # Kinesis Streaming
1625
    #
1626

1627
    def enable_kinesis_streaming_destination(
1✔
1628
        self,
1629
        context: RequestContext,
1630
        table_name: TableName,
1631
        stream_arn: StreamArn,
1632
        enable_kinesis_streaming_configuration: EnableKinesisStreamingConfiguration = None,
1633
        **kwargs,
1634
    ) -> KinesisStreamingDestinationOutput:
1635
        self.ensure_table_exists(
1✔
1636
            context.account_id,
1637
            context.region,
1638
            table_name,
1639
            error_message=f"Requested resource not found: Table: {table_name} not found",
1640
        )
1641

1642
        # TODO: Use the time precision in config if set
1643
        enable_kinesis_streaming_configuration = enable_kinesis_streaming_configuration or {}
1✔
1644

1645
        stream = self._event_forwarder.is_kinesis_stream_exists(stream_arn=stream_arn)
1✔
1646
        if not stream:
1✔
1647
            raise ValidationException("User does not have a permission to use kinesis stream")
×
1648

1649
        table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1✔
1650
            table_name, {}
1651
        )
1652

1653
        dest_status = table_def.get("KinesisDataStreamDestinationStatus")
1✔
1654
        if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:
1✔
1655
            raise ValidationException(
×
1656
                "Table is not in a valid state to enable Kinesis Streaming "
1657
                "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1658
                "to perform ENABLE operation."
1659
            )
1660

1661
        table_def.setdefault("KinesisDataStreamDestinations", [])
1✔
1662

1663
        # remove the stream destination if already present
1664
        table_def["KinesisDataStreamDestinations"] = [
1✔
1665
            t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn
1666
        ]
1667
        # append the active stream destination at the end of the list
1668
        table_def["KinesisDataStreamDestinations"].append(
1✔
1669
            {
1670
                "DestinationStatus": DestinationStatus.ACTIVE,
1671
                "DestinationStatusDescription": "Stream is active",
1672
                "StreamArn": stream_arn,
1673
                "ApproximateCreationDateTimePrecision": ApproximateCreationDateTimePrecision.MILLISECOND,
1674
            }
1675
        )
1676
        table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.ACTIVE
1✔
1677
        return KinesisStreamingDestinationOutput(
1✔
1678
            DestinationStatus=DestinationStatus.ENABLING,
1679
            StreamArn=stream_arn,
1680
            TableName=table_name,
1681
            EnableKinesisStreamingConfiguration=enable_kinesis_streaming_configuration,
1682
        )
1683

1684
    def disable_kinesis_streaming_destination(
1✔
1685
        self,
1686
        context: RequestContext,
1687
        table_name: TableName,
1688
        stream_arn: StreamArn,
1689
        enable_kinesis_streaming_configuration: EnableKinesisStreamingConfiguration = None,
1690
        **kwargs,
1691
    ) -> KinesisStreamingDestinationOutput:
1692
        self.ensure_table_exists(
1✔
1693
            context.account_id,
1694
            context.region,
1695
            table_name,
1696
            error_message=f"Requested resource not found: Table: {table_name} not found",
1697
        )
1698

1699
        # TODO: Must raise if invoked before KinesisStreamingDestination is ACTIVE
1700

1701
        stream = self._event_forwarder.is_kinesis_stream_exists(stream_arn=stream_arn)
1✔
1702
        if not stream:
1✔
1703
            raise ValidationException(
×
1704
                "User does not have a permission to use kinesis stream",
1705
            )
1706

1707
        table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1✔
1708
            table_name, {}
1709
        )
1710

1711
        stream_destinations = table_def.get("KinesisDataStreamDestinations")
1✔
1712
        if stream_destinations:
1✔
1713
            if table_def["KinesisDataStreamDestinationStatus"] == DestinationStatus.ACTIVE:
1✔
1714
                for dest in stream_destinations:
1✔
1715
                    if (
1✔
1716
                        dest["StreamArn"] == stream_arn
1717
                        and dest["DestinationStatus"] == DestinationStatus.ACTIVE
1718
                    ):
1719
                        dest["DestinationStatus"] = DestinationStatus.DISABLED
1✔
1720
                        dest["DestinationStatusDescription"] = ("Stream is disabled",)
1✔
1721
                        table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.DISABLED
1✔
1722
                        return KinesisStreamingDestinationOutput(
1✔
1723
                            DestinationStatus=DestinationStatus.DISABLING,
1724
                            StreamArn=stream_arn,
1725
                            TableName=table_name,
1726
                        )
1727
        raise ValidationException(
×
1728
            "Table is not in a valid state to disable Kinesis Streaming Destination:"
1729
            "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."
1730
        )
1731

1732
    def describe_kinesis_streaming_destination(
1✔
1733
        self, context: RequestContext, table_name: TableName, **kwargs
1734
    ) -> DescribeKinesisStreamingDestinationOutput:
1735
        self.ensure_table_exists(context.account_id, context.region, table_name)
1✔
1736

1737
        table_def = (
1✔
1738
            get_store(context.account_id, context.region).table_definitions.get(table_name) or {}
1739
        )
1740

1741
        stream_destinations = table_def.get("KinesisDataStreamDestinations") or []
1✔
1742
        stream_destinations = copy.deepcopy(stream_destinations)
1✔
1743

1744
        for destination in stream_destinations:
1✔
1745
            destination.pop("ApproximateCreationDateTimePrecision", None)
1✔
1746
            destination.pop("DestinationStatusDescription", None)
1✔
1747

1748
        return DescribeKinesisStreamingDestinationOutput(
1✔
1749
            KinesisDataStreamDestinations=stream_destinations,
1750
            TableName=table_name,
1751
        )
1752

1753
    def update_kinesis_streaming_destination(
1✔
1754
        self,
1755
        context: RequestContext,
1756
        table_name: TableArn,
1757
        stream_arn: StreamArn,
1758
        update_kinesis_streaming_configuration: UpdateKinesisStreamingConfiguration | None = None,
1759
        **kwargs,
1760
    ) -> UpdateKinesisStreamingDestinationOutput:
1761
        self.ensure_table_exists(context.account_id, context.region, table_name)
1✔
1762

1763
        if not update_kinesis_streaming_configuration:
1✔
1764
            raise ValidationException(
1✔
1765
                "Streaming destination cannot be updated with given parameters: "
1766
                "UpdateKinesisStreamingConfiguration cannot be null or contain only null values"
1767
            )
1768

1769
        time_precision = update_kinesis_streaming_configuration.get(
1✔
1770
            "ApproximateCreationDateTimePrecision"
1771
        )
1772
        if time_precision not in (
1✔
1773
            ApproximateCreationDateTimePrecision.MILLISECOND,
1774
            ApproximateCreationDateTimePrecision.MICROSECOND,
1775
        ):
1776
            raise ValidationException(
1✔
1777
                f"1 validation error detected: Value '{time_precision}' at "
1778
                "'updateKinesisStreamingConfiguration.approximateCreationDateTimePrecision' failed to satisfy constraint: "
1779
                "Member must satisfy enum value set: [MILLISECOND, MICROSECOND]"
1780
            )
1781

1782
        store = get_store(context.account_id, context.region)
1✔
1783

1784
        table_def = store.table_definitions.get(table_name) or {}
1✔
1785
        table_def.setdefault("KinesisDataStreamDestinations", [])
1✔
1786

1787
        table_id = table_def["TableId"]
1✔
1788

1789
        destination = None
1✔
1790
        for stream in table_def["KinesisDataStreamDestinations"]:
1✔
1791
            if stream["StreamArn"] == stream_arn:
1✔
1792
                destination = stream
1✔
1793

1794
        if destination is None:
1✔
1795
            raise ValidationException(
1✔
1796
                "Table is not in a valid state to enable Kinesis Streaming Destination: "
1797
                f"No streaming destination with streamArn: {stream_arn} found for table with tableName: {table_name}"
1798
            )
1799

1800
        if (
1✔
1801
            existing_precision := destination["ApproximateCreationDateTimePrecision"]
1802
        ) == update_kinesis_streaming_configuration["ApproximateCreationDateTimePrecision"]:
1803
            raise ValidationException(
1✔
1804
                f"Invalid Request: Precision is already set to the desired value of {existing_precision} "
1805
                f"for tableId: {table_id}, kdsArn: {stream_arn}"
1806
            )
1807

1808
        destination["ApproximateCreationDateTimePrecision"] = time_precision
1✔
1809

1810
        return UpdateKinesisStreamingDestinationOutput(
1✔
1811
            TableName=table_name,
1812
            StreamArn=stream_arn,
1813
            DestinationStatus=DestinationStatus.UPDATING,
1814
            UpdateKinesisStreamingConfiguration=UpdateKinesisStreamingConfiguration(
1815
                ApproximateCreationDateTimePrecision=time_precision,
1816
            ),
1817
        )
1818

1819
    #
1820
    # Continuous Backups
1821
    #
1822

1823
    def describe_continuous_backups(
1✔
1824
        self, context: RequestContext, table_name: TableName, **kwargs
1825
    ) -> DescribeContinuousBackupsOutput:
1826
        self.get_global_table_region(context, table_name)
1✔
1827
        store = get_store(context.account_id, context.region)
1✔
1828
        continuous_backup_description = (
1✔
1829
            store.table_properties.get(table_name, {}).get("ContinuousBackupsDescription")
1830
        ) or ContinuousBackupsDescription(
1831
            ContinuousBackupsStatus=ContinuousBackupsStatus.ENABLED,
1832
            PointInTimeRecoveryDescription=PointInTimeRecoveryDescription(
1833
                PointInTimeRecoveryStatus=PointInTimeRecoveryStatus.DISABLED
1834
            ),
1835
        )
1836

1837
        return DescribeContinuousBackupsOutput(
1✔
1838
            ContinuousBackupsDescription=continuous_backup_description
1839
        )
1840

1841
    def update_continuous_backups(
1✔
1842
        self,
1843
        context: RequestContext,
1844
        table_name: TableName,
1845
        point_in_time_recovery_specification: PointInTimeRecoverySpecification,
1846
        **kwargs,
1847
    ) -> UpdateContinuousBackupsOutput:
1848
        self.get_global_table_region(context, table_name)
1✔
1849

1850
        store = get_store(context.account_id, context.region)
1✔
1851
        pit_recovery_status = (
1✔
1852
            PointInTimeRecoveryStatus.ENABLED
1853
            if point_in_time_recovery_specification["PointInTimeRecoveryEnabled"]
1854
            else PointInTimeRecoveryStatus.DISABLED
1855
        )
1856
        continuous_backup_description = ContinuousBackupsDescription(
1✔
1857
            ContinuousBackupsStatus=ContinuousBackupsStatus.ENABLED,
1858
            PointInTimeRecoveryDescription=PointInTimeRecoveryDescription(
1859
                PointInTimeRecoveryStatus=pit_recovery_status
1860
            ),
1861
        )
1862
        table_props = store.table_properties.setdefault(table_name, {})
1✔
1863
        table_props["ContinuousBackupsDescription"] = continuous_backup_description
1✔
1864

1865
        return UpdateContinuousBackupsOutput(
1✔
1866
            ContinuousBackupsDescription=continuous_backup_description
1867
        )
1868

1869
    #
1870
    # Helpers
1871
    #
1872

1873
    @staticmethod
1✔
1874
    def ddb_region_name(region_name: str) -> str:
1✔
1875
        """Map `local` or `localhost` region to the us-east-1 region. These values are used by NoSQL Workbench."""
1876
        # TODO: could this be somehow moved into the request handler chain?
1877
        if region_name in ("local", "localhost"):
1✔
1878
            region_name = AWS_REGION_US_EAST_1
×
1879

1880
        return region_name
1✔
1881

1882
    @staticmethod
1✔
1883
    def table_exists(account_id: str, region_name: str, table_name: str) -> bool:
1✔
1884
        region_name = DynamoDBProvider.ddb_region_name(region_name)
1✔
1885

1886
        client = connect_to(
1✔
1887
            aws_access_key_id=account_id,
1888
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
1889
            region_name=region_name,
1890
        ).dynamodb
1891
        return dynamodb_table_exists(table_name, client)
1✔
1892

1893
    @staticmethod
1✔
1894
    def ensure_table_exists(
1✔
1895
        account_id: str,
1896
        region_name: str,
1897
        table_name: str,
1898
        error_message: str = "Cannot do operations on a non-existent table",
1899
    ):
1900
        """
1901
        Raise ResourceNotFoundException if the given table does not exist.
1902

1903
        :param account_id: account id
1904
        :param region_name: region name
1905
        :param table_name: table name
1906
        :raise: ResourceNotFoundException if table does not exist in DynamoDB Local
1907
        """
1908
        if not DynamoDBProvider.table_exists(account_id, region_name, table_name):
1✔
1909
            raise ResourceNotFoundException(error_message)
1✔
1910

1911
    @staticmethod
1✔
1912
    def get_global_table_region(context: RequestContext, table_name: str) -> str:
1✔
1913
        """
1914
        Return the table region considering that it might be a replicated table.
1915

1916
        Replication in LocalStack works by keeping a single copy of a table and forwarding
1917
        requests to the region where this table exists.
1918

1919
        This method does not check whether the table actually exists in DDBLocal.
1920

1921
        :param context: request context
1922
        :param table_name: table name
1923
        :return: region
1924
        """
1925
        store = get_store(context.account_id, context.region)
1✔
1926

1927
        table_region = store.TABLE_REGION.get(table_name)
1✔
1928
        replicated_at = store.REPLICAS.get(table_name, {}).keys()
1✔
1929

1930
        if context.region == table_region or context.region in replicated_at:
1✔
1931
            return table_region
1✔
1932

1933
        return context.region
1✔
1934

1935
    @staticmethod
1✔
1936
    def prepare_request_headers(headers: dict, account_id: str, region_name: str):
1✔
1937
        """
1938
        Modify the Credentials field of Authorization header to achieve namespacing in DynamoDBLocal.
1939
        """
1940
        region_name = DynamoDBProvider.ddb_region_name(region_name)
1✔
1941
        key = get_ddb_access_key(account_id, region_name)
1✔
1942

1943
        # DynamoDBLocal namespaces based on the value of Credentials
1944
        # Since we want to namespace by both account ID and region, use an aggregate key
1945
        # We also replace the region to keep compatibility with NoSQL Workbench
1946
        headers["Authorization"] = re.sub(
1✔
1947
            AUTH_CREDENTIAL_REGEX,
1948
            rf"Credential={key}/\2/{region_name}/\4/",
1949
            headers.get("Authorization") or "",
1950
            flags=re.IGNORECASE,
1951
        )
1952

1953
    def fix_consumed_capacity(self, request: dict, result: dict):
1✔
1954
        # make sure we append 'ConsumedCapacity', which is properly
1955
        # returned by dynalite, but not by AWS's DynamoDBLocal
1956
        table_name = request.get("TableName")
1✔
1957
        return_cap = request.get("ReturnConsumedCapacity")
1✔
1958
        if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:
1✔
1959
            request["ConsumedCapacity"] = {
×
1960
                "TableName": table_name,
1961
                "CapacityUnits": 5,  # TODO hardcoded
1962
                "ReadCapacityUnits": 2,
1963
                "WriteCapacityUnits": 3,
1964
            }
1965

1966
    def fix_table_arn(self, account_id: str, region_name: str, arn: str) -> str:
1✔
1967
        """
1968
        Set the correct account ID and region in ARNs returned by DynamoDB Local.
1969
        """
1970
        partition = get_partition(region_name)
1✔
1971
        return (
1✔
1972
            arn.replace("arn:aws:", f"arn:{partition}:")
1973
            .replace(":ddblocal:", f":{region_name}:")
1974
            .replace(":000000000000:", f":{account_id}:")
1975
        )
1976

1977
    def prepare_transact_write_item_records(
1✔
1978
        self,
1979
        account_id: str,
1980
        region_name: str,
1981
        transact_items: TransactWriteItemList,
1982
        existing_items: BatchGetResponseMap,
1983
        updated_items: BatchGetResponseMap,
1984
        tables_stream_type: dict[TableName, TableStreamType],
1985
    ) -> RecordsMap:
1986
        records_only_map: dict[TableName, StreamRecords] = defaultdict(list)
1✔
1987

1988
        for request in transact_items:
1✔
1989
            record = self.get_record_template(region_name)
1✔
1990
            match request:
1✔
1991
                case {"Put": {"TableName": table_name, "Item": new_item}}:
1✔
1992
                    if not (stream_type := tables_stream_type.get(table_name)):
1✔
1993
                        continue
1✔
1994
                    keys = SchemaExtractor.extract_keys(
1✔
1995
                        item=new_item,
1996
                        table_name=table_name,
1997
                        account_id=account_id,
1998
                        region_name=region_name,
1999
                    )
2000
                    existing_item = find_item_for_keys_values_in_batch(
1✔
2001
                        table_name, keys, existing_items
2002
                    )
2003
                    if existing_item == new_item:
1✔
2004
                        continue
1✔
2005

2006
                    if stream_type.stream_view_type:
1✔
2007
                        record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
2008

2009
                    record["eventID"] = short_uid()
1✔
2010
                    record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
2011
                    record["dynamodb"]["Keys"] = keys
1✔
2012
                    if stream_type.needs_new_image:
1✔
2013
                        record["dynamodb"]["NewImage"] = new_item
1✔
2014
                    if existing_item and stream_type.needs_old_image:
1✔
2015
                        record["dynamodb"]["OldImage"] = existing_item
1✔
2016

2017
                    record_item = de_dynamize_record(new_item)
1✔
2018
                    record["dynamodb"]["SizeBytes"] = _get_size_bytes(record_item)
1✔
2019
                    records_only_map[table_name].append(record)
1✔
2020
                    continue
1✔
2021

2022
                case {"Update": {"TableName": table_name, "Key": keys}}:
1✔
2023
                    if not (stream_type := tables_stream_type.get(table_name)):
1✔
2024
                        continue
×
2025
                    updated_item = find_item_for_keys_values_in_batch(
1✔
2026
                        table_name, keys, updated_items
2027
                    )
2028
                    if not updated_item:
1✔
2029
                        continue
×
2030

2031
                    existing_item = find_item_for_keys_values_in_batch(
1✔
2032
                        table_name, keys, existing_items
2033
                    )
2034
                    if existing_item == updated_item:
1✔
2035
                        # if the item is the same as the previous version, AWS does not send an event
2036
                        continue
1✔
2037

2038
                    if stream_type.stream_view_type:
1✔
2039
                        record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
2040

2041
                    record["eventID"] = short_uid()
1✔
2042
                    record["eventName"] = "MODIFY" if existing_item else "INSERT"
1✔
2043
                    record["dynamodb"]["Keys"] = keys
1✔
2044

2045
                    if existing_item and stream_type.needs_old_image:
1✔
2046
                        record["dynamodb"]["OldImage"] = existing_item
1✔
2047
                    if stream_type.needs_new_image:
1✔
2048
                        record["dynamodb"]["NewImage"] = updated_item
1✔
2049

2050
                    record["dynamodb"]["SizeBytes"] = _get_size_bytes(updated_item)
1✔
2051
                    records_only_map[table_name].append(record)
1✔
2052
                    continue
1✔
2053

2054
                case {"Delete": {"TableName": table_name, "Key": keys}}:
1✔
2055
                    if not (stream_type := tables_stream_type.get(table_name)):
1✔
2056
                        continue
×
2057

2058
                    existing_item = find_item_for_keys_values_in_batch(
1✔
2059
                        table_name, keys, existing_items
2060
                    )
2061
                    if not existing_item:
1✔
2062
                        continue
×
2063

2064
                    if stream_type.stream_view_type:
1✔
2065
                        record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
2066

2067
                    record["eventID"] = short_uid()
1✔
2068
                    record["eventName"] = "REMOVE"
1✔
2069
                    record["dynamodb"]["Keys"] = keys
1✔
2070
                    if stream_type.needs_old_image:
1✔
2071
                        record["dynamodb"]["OldImage"] = existing_item
1✔
2072
                    record_item = de_dynamize_record(existing_item)
1✔
2073
                    record["dynamodb"]["SizeBytes"] = _get_size_bytes(record_item)
1✔
2074

2075
                    records_only_map[table_name].append(record)
1✔
2076
                    continue
1✔
2077

2078
        records_map = {
1✔
2079
            table_name: TableRecords(
2080
                records=records, table_stream_type=tables_stream_type[table_name]
2081
            )
2082
            for table_name, records in records_only_map.items()
2083
        }
2084

2085
        return records_map
1✔
2086

2087
    def batch_execute_statement(
1✔
2088
        self,
2089
        context: RequestContext,
2090
        statements: PartiQLBatchRequest,
2091
        return_consumed_capacity: ReturnConsumedCapacity = None,
2092
        **kwargs,
2093
    ) -> BatchExecuteStatementOutput:
2094
        result = self.forward_request(context)
1✔
2095
        return result
1✔
2096

2097
    def prepare_batch_write_item_records(
1✔
2098
        self,
2099
        account_id: str,
2100
        region_name: str,
2101
        tables_stream_type: dict[TableName, TableStreamType],
2102
        request_items: BatchWriteItemRequestMap,
2103
        existing_items: BatchGetResponseMap,
2104
    ) -> RecordsMap:
2105
        records_map: RecordsMap = {}
1✔
2106

2107
        # only iterate over tables with streams
2108
        for table_name, stream_type in tables_stream_type.items():
1✔
2109
            existing_items_for_table_unordered = existing_items.get(table_name, [])
1✔
2110
            table_records: StreamRecords = []
1✔
2111

2112
            def find_existing_item_for_keys_values(item_keys: dict) -> AttributeMap | None:
1✔
2113
                """
2114
                This function looks up in the existing items for the provided item keys subset. If present, returns the
2115
                full item.
2116
                :param item_keys: the request item keys
2117
                :return:
2118
                """
2119
                keys_items = item_keys.items()
1✔
2120
                for item in existing_items_for_table_unordered:
1✔
2121
                    if keys_items <= item.items():
1✔
2122
                        return item
1✔
2123

2124
            for write_request in request_items[table_name]:
1✔
2125
                record = self.get_record_template(
1✔
2126
                    region_name,
2127
                    stream_view_type=stream_type.stream_view_type,
2128
                )
2129
                match write_request:
1✔
2130
                    case {"PutRequest": request}:
1✔
2131
                        keys = SchemaExtractor.extract_keys(
1✔
2132
                            item=request["Item"],
2133
                            table_name=table_name,
2134
                            account_id=account_id,
2135
                            region_name=region_name,
2136
                        )
2137
                        # we need to find if there was an existing item even if we don't need it for `OldImage`, because
2138
                        # of the `eventName`
2139
                        existing_item = find_existing_item_for_keys_values(keys)
1✔
2140
                        if existing_item == request["Item"]:
1✔
2141
                            # if the item is the same as the previous version, AWS does not send an event
2142
                            continue
1✔
2143
                        record["eventID"] = short_uid()
1✔
2144
                        record["dynamodb"]["SizeBytes"] = _get_size_bytes(request["Item"])
1✔
2145
                        record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
2146
                        record["dynamodb"]["Keys"] = keys
1✔
2147

2148
                        if stream_type.needs_new_image:
1✔
2149
                            record["dynamodb"]["NewImage"] = request["Item"]
1✔
2150
                        if existing_item and stream_type.needs_old_image:
1✔
2151
                            record["dynamodb"]["OldImage"] = existing_item
1✔
2152

2153
                        table_records.append(record)
1✔
2154
                        continue
1✔
2155

2156
                    case {"DeleteRequest": request}:
1✔
2157
                        keys = request["Key"]
1✔
2158
                        if not (existing_item := find_existing_item_for_keys_values(keys)):
1✔
2159
                            continue
×
2160

2161
                        record["eventID"] = short_uid()
1✔
2162
                        record["eventName"] = "REMOVE"
1✔
2163
                        record["dynamodb"]["Keys"] = keys
1✔
2164
                        if stream_type.needs_old_image:
1✔
2165
                            record["dynamodb"]["OldImage"] = existing_item
1✔
2166
                        record["dynamodb"]["SizeBytes"] = _get_size_bytes(existing_item)
1✔
2167
                        table_records.append(record)
1✔
2168
                        continue
1✔
2169

2170
            records_map[table_name] = TableRecords(
1✔
2171
                records=table_records, table_stream_type=stream_type
2172
            )
2173

2174
        return records_map
1✔
2175

2176
    def forward_stream_records(
1✔
2177
        self,
2178
        account_id: str,
2179
        region_name: str,
2180
        records_map: RecordsMap,
2181
    ) -> None:
2182
        if not records_map:
1✔
2183
            return
×
2184

2185
        self._event_forwarder.forward_to_targets(
1✔
2186
            account_id, region_name, records_map, background=True
2187
        )
2188

2189
    @staticmethod
1✔
2190
    def get_record_template(region_name: str, stream_view_type: str | None = None) -> StreamRecord:
1✔
2191
        record = {
1✔
2192
            "eventID": short_uid(),
2193
            "eventVersion": "1.1",
2194
            "dynamodb": {
2195
                # expects nearest second rounded down
2196
                "ApproximateCreationDateTime": int(time.time()),
2197
                "SizeBytes": -1,
2198
            },
2199
            "awsRegion": region_name,
2200
            "eventSource": "aws:dynamodb",
2201
        }
2202
        if stream_view_type:
1✔
2203
            record["dynamodb"]["StreamViewType"] = stream_view_type
1✔
2204

2205
        return record
1✔
2206

2207
    def check_provisioned_throughput(self, action):
1✔
2208
        """
2209
        Check rate limiting for an API operation and raise an error if provisioned throughput is exceeded.
2210
        """
2211
        if self.should_throttle(action):
1✔
2212
            message = (
1✔
2213
                "The level of configured provisioned throughput for the table was exceeded. "
2214
                + "Consider increasing your provisioning level with the UpdateTable API"
2215
            )
2216
            raise ProvisionedThroughputExceededException(message)
1✔
2217

2218
    def action_should_throttle(self, action, actions):
1✔
2219
        throttled = [f"{ACTION_PREFIX}{a}" for a in actions]
1✔
2220
        return (action in throttled) or (action in actions)
1✔
2221

2222
    def should_throttle(self, action):
1✔
2223
        if (
1✔
2224
            not config.DYNAMODB_READ_ERROR_PROBABILITY
2225
            and not config.DYNAMODB_ERROR_PROBABILITY
2226
            and not config.DYNAMODB_WRITE_ERROR_PROBABILITY
2227
        ):
2228
            # early exit so we don't need to call random()
2229
            return False
1✔
2230

2231
        rand = random.random()
1✔
2232
        if rand < config.DYNAMODB_READ_ERROR_PROBABILITY and self.action_should_throttle(
1✔
2233
            action, READ_THROTTLED_ACTIONS
2234
        ):
2235
            return True
1✔
2236
        elif rand < config.DYNAMODB_WRITE_ERROR_PROBABILITY and self.action_should_throttle(
1✔
2237
            action, WRITE_THROTTLED_ACTIONS
2238
        ):
2239
            return True
1✔
2240
        elif rand < config.DYNAMODB_ERROR_PROBABILITY and self.action_should_throttle(
1✔
2241
            action, THROTTLED_ACTIONS
2242
        ):
2243
            return True
1✔
2244
        return False
1✔
2245

2246

2247
# ---
2248
# Misc. util functions
2249
# ---
2250

2251

2252
def _get_size_bytes(item: dict) -> int:
1✔
2253
    try:
1✔
2254
        size_bytes = len(json.dumps(item, separators=(",", ":")))
1✔
2255
    except TypeError:
1✔
2256
        size_bytes = len(str(item))
1✔
2257
    return size_bytes
1✔
2258

2259

2260
def get_global_secondary_index(account_id: str, region_name: str, table_name: str, index_name: str):
1✔
2261
    schema = SchemaExtractor.get_table_schema(table_name, account_id, region_name)
1✔
2262
    for index in schema["Table"].get("GlobalSecondaryIndexes", []):
1✔
2263
        if index["IndexName"] == index_name:
1✔
2264
            return index
1✔
2265
    raise ResourceNotFoundException("Index not found")
×
2266

2267

2268
def is_local_secondary_index(
1✔
2269
    account_id: str, region_name: str, table_name: str, index_name: str
2270
) -> bool:
2271
    schema = SchemaExtractor.get_table_schema(table_name, account_id, region_name)
1✔
2272
    for index in schema["Table"].get("LocalSecondaryIndexes", []):
1✔
2273
        if index["IndexName"] == index_name:
1✔
2274
            return True
1✔
2275
    return False
1✔
2276

2277

2278
def is_index_query_valid(account_id: str, region_name: str, query_data: dict) -> bool:
1✔
2279
    table_name = to_str(query_data["TableName"])
1✔
2280
    index_name = to_str(query_data["IndexName"])
1✔
2281
    if is_local_secondary_index(account_id, region_name, table_name, index_name):
1✔
2282
        return True
1✔
2283
    index_query_type = query_data.get("Select")
1✔
2284
    index = get_global_secondary_index(account_id, region_name, table_name, index_name)
1✔
2285
    index_projection_type = index.get("Projection").get("ProjectionType")
1✔
2286
    if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":
1✔
2287
        return False
1✔
2288
    return True
1✔
2289

2290

2291
def get_table_stream_type(
1✔
2292
    account_id: str, region_name: str, table_name_or_arn: str
2293
) -> TableStreamType | None:
2294
    """
2295
    :param account_id: the account id of the table
2296
    :param region_name: the region of the table
2297
    :param table_name_or_arn: the table name or ARN
2298
    :return: a TableStreamViewType object if the table has streams enabled. If not, return None
2299
    """
2300
    if not table_name_or_arn:
1✔
2301
        return
×
2302

2303
    table_name = table_name_or_arn.split(":table/")[-1]
1✔
2304

2305
    is_kinesis = False
1✔
2306
    stream_view_type = None
1✔
2307

2308
    if table_definition := get_store(account_id, region_name).table_definitions.get(table_name):
1✔
2309
        if table_definition.get("KinesisDataStreamDestinationStatus") == "ACTIVE":
1✔
2310
            is_kinesis = True
1✔
2311

2312
    table_arn = arns.dynamodb_table_arn(table_name, account_id=account_id, region_name=region_name)
1✔
2313

2314
    if (
1✔
2315
        stream := dynamodbstreams_api.get_stream_for_table(account_id, region_name, table_arn)
2316
    ) and stream["StreamStatus"] in (StreamStatus.ENABLING, StreamStatus.ENABLED):
2317
        stream_view_type = stream["StreamViewType"]
1✔
2318

2319
    if is_kinesis or stream_view_type:
1✔
2320
        return TableStreamType(stream_view_type, is_kinesis=is_kinesis)
1✔
2321

2322

2323
def get_updated_records(
1✔
2324
    account_id: str,
2325
    region_name: str,
2326
    table_name: str,
2327
    existing_items: list,
2328
    server_url: str,
2329
    table_stream_type: TableStreamType,
2330
) -> RecordsMap:
2331
    """
2332
    Determine the list of record updates, to be sent to a DDB stream after a PartiQL update operation.
2333

2334
    Note: This is currently a fairly expensive operation, as we need to retrieve the list of all items
2335
          from the table, and compare the items to the previously available. This is a limitation as
2336
          we're currently using the DynamoDB Local backend as a blackbox. In future, we should consider hooking
2337
          into the PartiQL query execution inside DynamoDB Local and directly extract the list of updated items.
2338
    """
2339
    result = []
1✔
2340

2341
    key_schema = SchemaExtractor.get_key_schema(table_name, account_id, region_name)
1✔
2342
    before = ItemSet(existing_items, key_schema=key_schema)
1✔
2343
    all_table_items = ItemFinder.get_all_table_items(
1✔
2344
        account_id=account_id,
2345
        region_name=region_name,
2346
        table_name=table_name,
2347
        endpoint_url=server_url,
2348
    )
2349
    after = ItemSet(all_table_items, key_schema=key_schema)
1✔
2350

2351
    def _add_record(item, comparison_set: ItemSet):
1✔
2352
        matching_item = comparison_set.find_item(item)
1✔
2353
        if matching_item == item:
1✔
2354
            return
×
2355

2356
        # determine event type
2357
        if comparison_set == after:
1✔
2358
            if matching_item:
1✔
2359
                return
1✔
2360
            event_name = "REMOVE"
1✔
2361
        else:
2362
            event_name = "INSERT" if not matching_item else "MODIFY"
1✔
2363

2364
        old_image = item if event_name == "REMOVE" else matching_item
1✔
2365
        new_image = matching_item if event_name == "REMOVE" else item
1✔
2366

2367
        # prepare record
2368
        keys = SchemaExtractor.extract_keys_for_schema(item=item, key_schema=key_schema)
1✔
2369

2370
        record = DynamoDBProvider.get_record_template(region_name)
1✔
2371
        record["eventName"] = event_name
1✔
2372
        record["dynamodb"]["Keys"] = keys
1✔
2373
        record["dynamodb"]["SizeBytes"] = _get_size_bytes(item)
1✔
2374

2375
        if table_stream_type.stream_view_type:
1✔
2376
            record["dynamodb"]["StreamViewType"] = table_stream_type.stream_view_type
1✔
2377
        if table_stream_type.needs_new_image:
1✔
2378
            record["dynamodb"]["NewImage"] = new_image
×
2379
        if old_image and table_stream_type.needs_old_image:
1✔
2380
            record["dynamodb"]["OldImage"] = old_image
×
2381

2382
        result.append(record)
1✔
2383

2384
    # loop over items in new item list (find INSERT/MODIFY events)
2385
    for item in after.items_list:
1✔
2386
        _add_record(item, before)
1✔
2387
    # loop over items in old item list (find REMOVE events)
2388
    for item in before.items_list:
1✔
2389
        _add_record(item, after)
1✔
2390

2391
    return {table_name: TableRecords(records=result, table_stream_type=table_stream_type)}
1✔
2392

2393

2394
def create_dynamodb_stream(account_id: str, region_name: str, data, latest_stream_label):
1✔
2395
    stream = data["StreamSpecification"]
1✔
2396
    enabled = stream.get("StreamEnabled")
1✔
2397

2398
    if enabled not in [False, "False"]:
1✔
2399
        table_name = data["TableName"]
1✔
2400
        view_type = stream["StreamViewType"]
1✔
2401

2402
        dynamodbstreams_api.add_dynamodb_stream(
1✔
2403
            account_id=account_id,
2404
            region_name=region_name,
2405
            table_name=table_name,
2406
            latest_stream_label=latest_stream_label,
2407
            view_type=view_type,
2408
            enabled=enabled,
2409
        )
2410

2411

2412
def dynamodb_get_table_stream_specification(account_id: str, region_name: str, table_name: str):
1✔
2413
    try:
×
2414
        table_schema = SchemaExtractor.get_table_schema(
×
2415
            table_name, account_id=account_id, region_name=region_name
2416
        )
2417
        return table_schema["Table"].get("StreamSpecification")
×
2418
    except Exception as e:
×
2419
        LOG.info(
×
2420
            "Unable to get stream specification for table %s: %s %s",
2421
            table_name,
2422
            e,
2423
            traceback.format_exc(),
2424
        )
2425
        raise e
×
2426

2427

2428
def find_item_for_keys_values_in_batch(
1✔
2429
    table_name: str, item_keys: dict, batch: BatchGetResponseMap
2430
) -> AttributeMap | None:
2431
    """
2432
    This function looks up in the existing items for the provided item keys subset. If present, returns the
2433
    full item.
2434
    :param table_name: the table name for the item
2435
    :param item_keys: the request item keys
2436
    :param batch: the values in which to look for the item
2437
    :return: a DynamoDB Item (AttributeMap)
2438
    """
2439
    keys = item_keys.items()
1✔
2440
    for item in batch.get(table_name, []):
1✔
2441
        if keys <= item.items():
1✔
2442
            return item
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc