• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20942662173

12 Jan 2026 04:45PM UTC coverage: 86.905% (-0.03%) from 86.936%
20942662173

push

github

web-flow
Allow authenticated pull and push of docker images (#13569)

34 of 51 new or added lines in 4 files covered. (66.67%)

247 existing lines in 15 files now uncovered.

70218 of 80799 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.95
/localstack-core/localstack/services/dynamodb/provider.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import random
1✔
6
import re
1✔
7
import threading
1✔
8
import time
1✔
9
import traceback
1✔
10
from collections import defaultdict
1✔
11
from concurrent.futures import ThreadPoolExecutor
1✔
12
from contextlib import contextmanager
1✔
13
from datetime import datetime
1✔
14
from operator import itemgetter
1✔
15

16
import requests
1✔
17
import werkzeug
1✔
18

19
from localstack import config
1✔
20
from localstack.aws import handlers
1✔
21
from localstack.aws.api import (
1✔
22
    CommonServiceException,
23
    RequestContext,
24
    ServiceRequest,
25
    ServiceResponse,
26
    handler,
27
)
28
from localstack.aws.api.dynamodb import (
1✔
29
    ApproximateCreationDateTimePrecision,
30
    AttributeMap,
31
    BatchExecuteStatementOutput,
32
    BatchGetItemOutput,
33
    BatchGetRequestMap,
34
    BatchGetResponseMap,
35
    BatchWriteItemInput,
36
    BatchWriteItemOutput,
37
    BatchWriteItemRequestMap,
38
    BillingMode,
39
    ContinuousBackupsDescription,
40
    ContinuousBackupsStatus,
41
    CreateGlobalTableOutput,
42
    CreateTableInput,
43
    CreateTableOutput,
44
    Delete,
45
    DeleteItemInput,
46
    DeleteItemOutput,
47
    DeleteRequest,
48
    DeleteTableOutput,
49
    DescribeContinuousBackupsOutput,
50
    DescribeContributorInsightsInput,
51
    DescribeContributorInsightsOutput,
52
    DescribeGlobalTableOutput,
53
    DescribeKinesisStreamingDestinationOutput,
54
    DescribeTableOutput,
55
    DescribeTimeToLiveOutput,
56
    DestinationStatus,
57
    DynamodbApi,
58
    EnableKinesisStreamingConfiguration,
59
    ExecuteStatementInput,
60
    ExecuteStatementOutput,
61
    ExecuteTransactionInput,
62
    ExecuteTransactionOutput,
63
    GetItemInput,
64
    GetItemOutput,
65
    GlobalTableAlreadyExistsException,
66
    GlobalTableNotFoundException,
67
    KinesisStreamingDestinationOutput,
68
    ListGlobalTablesOutput,
69
    ListTablesInputLimit,
70
    ListTablesOutput,
71
    ListTagsOfResourceOutput,
72
    NextTokenString,
73
    PartiQLBatchRequest,
74
    PointInTimeRecoveryDescription,
75
    PointInTimeRecoverySpecification,
76
    PointInTimeRecoveryStatus,
77
    PositiveIntegerObject,
78
    ProvisionedThroughputExceededException,
79
    Put,
80
    PutItemInput,
81
    PutItemOutput,
82
    PutRequest,
83
    QueryInput,
84
    QueryOutput,
85
    RegionName,
86
    ReplicaDescription,
87
    ReplicaList,
88
    ReplicaStatus,
89
    ReplicaUpdateList,
90
    ResourceArnString,
91
    ResourceInUseException,
92
    ResourceNotFoundException,
93
    ReturnConsumedCapacity,
94
    ScanInput,
95
    ScanOutput,
96
    StreamArn,
97
    TableArn,
98
    TableDescription,
99
    TableName,
100
    TagKeyList,
101
    TagList,
102
    TimeToLiveSpecification,
103
    TransactGetItemList,
104
    TransactGetItemsOutput,
105
    TransactWriteItem,
106
    TransactWriteItemList,
107
    TransactWriteItemsInput,
108
    TransactWriteItemsOutput,
109
    Update,
110
    UpdateContinuousBackupsOutput,
111
    UpdateGlobalTableOutput,
112
    UpdateItemInput,
113
    UpdateItemOutput,
114
    UpdateKinesisStreamingConfiguration,
115
    UpdateKinesisStreamingDestinationOutput,
116
    UpdateTableInput,
117
    UpdateTableOutput,
118
    UpdateTimeToLiveOutput,
119
    WriteRequest,
120
)
121
from localstack.aws.api.dynamodbstreams import StreamStatus
1✔
122
from localstack.aws.connect import connect_to
1✔
123
from localstack.config import is_persistence_enabled
1✔
124
from localstack.constants import (
1✔
125
    AUTH_CREDENTIAL_REGEX,
126
    AWS_REGION_US_EAST_1,
127
    INTERNAL_AWS_SECRET_ACCESS_KEY,
128
)
129
from localstack.http import Request, Response, route
1✔
130
from localstack.services.dynamodb.models import (
1✔
131
    DynamoDBStore,
132
    RecordsMap,
133
    StreamRecord,
134
    StreamRecords,
135
    TableRecords,
136
    TableStreamType,
137
    dynamodb_stores,
138
)
139
from localstack.services.dynamodb.server import DynamodbServer
1✔
140
from localstack.services.dynamodb.utils import (
1✔
141
    ItemFinder,
142
    ItemSet,
143
    SchemaExtractor,
144
    de_dynamize_record,
145
    extract_table_name_from_partiql_update,
146
    get_ddb_access_key,
147
    modify_ddblocal_arns,
148
)
149
from localstack.services.dynamodbstreams import dynamodbstreams_api
1✔
150
from localstack.services.dynamodbstreams.models import dynamodbstreams_stores
1✔
151
from localstack.services.edge import ROUTER
1✔
152
from localstack.services.plugins import ServiceLifecycleHook
1✔
153
from localstack.state import AssetDirectory, StateVisitor
1✔
154
from localstack.utils.aws import arns
1✔
155
from localstack.utils.aws.arns import (
1✔
156
    extract_account_id_from_arn,
157
    extract_region_from_arn,
158
    get_partition,
159
)
160
from localstack.utils.aws.aws_stack import get_valid_regions_for_service
1✔
161
from localstack.utils.aws.request_context import (
1✔
162
    extract_account_id_from_headers,
163
    extract_region_from_headers,
164
)
165
from localstack.utils.collections import select_attributes, select_from_typed_dict
1✔
166
from localstack.utils.common import short_uid, to_bytes
1✔
167
from localstack.utils.files import cp_r, rm_rf
1✔
168
from localstack.utils.json import BytesEncoder, canonical_json
1✔
169
from localstack.utils.scheduler import Scheduler
1✔
170
from localstack.utils.strings import long_uid, md5, to_str
1✔
171
from localstack.utils.threads import FuncThread, start_thread
1✔
172

173
# set up logger
174
LOG = logging.getLogger(__name__)
1✔
175

176
# action header prefix
177
ACTION_PREFIX = "DynamoDB_20120810."
1✔
178

179
# list of actions subject to throughput limitations
180
READ_THROTTLED_ACTIONS = [
1✔
181
    "GetItem",
182
    "Query",
183
    "Scan",
184
    "TransactGetItems",
185
    "BatchGetItem",
186
]
187
WRITE_THROTTLED_ACTIONS = [
1✔
188
    "PutItem",
189
    "BatchWriteItem",
190
    "UpdateItem",
191
    "DeleteItem",
192
    "TransactWriteItems",
193
]
194
THROTTLED_ACTIONS = READ_THROTTLED_ACTIONS + WRITE_THROTTLED_ACTIONS
1✔
195

196
MANAGED_KMS_KEYS = {}
1✔
197

198

199
def dynamodb_table_exists(table_name: str, client=None) -> bool:
1✔
200
    client = client or connect_to().dynamodb
1✔
201
    paginator = client.get_paginator("list_tables")
1✔
202
    pages = paginator.paginate(PaginationConfig={"PageSize": 100})
1✔
203
    table_name = to_str(table_name)
1✔
204
    return any(table_name in page["TableNames"] for page in pages)
1✔
205

206

207
class EventForwarder:
1✔
208
    def __init__(self, num_thread: int = 10):
1✔
209
        self.executor = ThreadPoolExecutor(num_thread, thread_name_prefix="ddb_stream_fwd")
1✔
210

211
    def shutdown(self):
1✔
212
        self.executor.shutdown(wait=False)
1✔
213

214
    def forward_to_targets(
1✔
215
        self, account_id: str, region_name: str, records_map: RecordsMap, background: bool = True
216
    ) -> None:
217
        if background:
1✔
218
            self._submit_records(
1✔
219
                account_id=account_id,
220
                region_name=region_name,
221
                records_map=records_map,
222
            )
223
        else:
224
            self._forward(account_id, region_name, records_map)
×
225

226
    def _submit_records(self, account_id: str, region_name: str, records_map: RecordsMap):
1✔
227
        """Required for patching submit with local thread context for EventStudio"""
228
        self.executor.submit(
1✔
229
            self._forward,
230
            account_id,
231
            region_name,
232
            records_map,
233
        )
234

235
    def _forward(self, account_id: str, region_name: str, records_map: RecordsMap) -> None:
1✔
236
        try:
1✔
237
            self.forward_to_kinesis_stream(account_id, region_name, records_map)
1✔
238
        except Exception as e:
×
239
            LOG.debug(
×
240
                "Error while publishing to Kinesis streams: '%s'",
241
                e,
242
                exc_info=LOG.isEnabledFor(logging.DEBUG),
243
            )
244

245
        try:
1✔
246
            self.forward_to_ddb_stream(account_id, region_name, records_map)
1✔
247
        except Exception as e:
×
248
            LOG.debug(
×
249
                "Error while publishing to DynamoDB streams, '%s'",
250
                e,
251
                exc_info=LOG.isEnabledFor(logging.DEBUG),
252
            )
253

254
    @staticmethod
1✔
255
    def forward_to_ddb_stream(account_id: str, region_name: str, records_map: RecordsMap) -> None:
1✔
256
        dynamodbstreams_api.forward_events(account_id, region_name, records_map)
1✔
257

258
    @staticmethod
1✔
259
    def forward_to_kinesis_stream(
1✔
260
        account_id: str, region_name: str, records_map: RecordsMap
261
    ) -> None:
262
        # You can only stream data from DynamoDB to Kinesis Data Streams in the same AWS account and AWS Region as your
263
        # table.
264
        # You can only stream data from a DynamoDB table to one Kinesis data stream.
265
        store = get_store(account_id, region_name)
1✔
266

267
        for table_name, table_records in records_map.items():
1✔
268
            table_stream_type = table_records["table_stream_type"]
1✔
269
            if not table_stream_type.is_kinesis:
1✔
270
                continue
1✔
271

272
            kinesis_records = []
1✔
273

274
            table_arn = arns.dynamodb_table_arn(table_name, account_id, region_name)
1✔
275
            records = table_records["records"]
1✔
276
            table_def = store.table_definitions.get(table_name) or {}
1✔
277
            stream_arn = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"]
1✔
278
            for record in records:
1✔
279
                kinesis_record = dict(
1✔
280
                    tableName=table_name,
281
                    recordFormat="application/json",
282
                    userIdentity=None,
283
                    **record,
284
                )
285
                fields_to_remove = {"StreamViewType", "SequenceNumber"}
1✔
286
                kinesis_record["dynamodb"] = {
1✔
287
                    k: v for k, v in record["dynamodb"].items() if k not in fields_to_remove
288
                }
289
                kinesis_record.pop("eventVersion", None)
1✔
290

291
                hash_keys = list(
1✔
292
                    filter(lambda key: key["KeyType"] == "HASH", table_def["KeySchema"])
293
                )
294
                # TODO: reverse properly how AWS creates the partition key, it seems to be an MD5 hash
295
                kinesis_partition_key = md5(f"{table_name}{hash_keys[0]['AttributeName']}")
1✔
296

297
                kinesis_records.append(
1✔
298
                    {
299
                        "Data": json.dumps(kinesis_record, cls=BytesEncoder),
300
                        "PartitionKey": kinesis_partition_key,
301
                    }
302
                )
303

304
            kinesis = connect_to(
1✔
305
                aws_access_key_id=account_id,
306
                aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
307
                region_name=region_name,
308
            ).kinesis.request_metadata(service_principal="dynamodb", source_arn=table_arn)
309

310
            kinesis.put_records(
1✔
311
                StreamARN=stream_arn,
312
                Records=kinesis_records,
313
            )
314

315
    @classmethod
1✔
316
    def is_kinesis_stream_exists(cls, stream_arn):
1✔
317
        account_id = extract_account_id_from_arn(stream_arn)
1✔
318
        region_name = extract_region_from_arn(stream_arn)
1✔
319

320
        kinesis = connect_to(
1✔
321
            aws_access_key_id=account_id,
322
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
323
            region_name=region_name,
324
        ).kinesis
325
        stream_name_from_arn = stream_arn.split("/", 1)[1]
1✔
326
        # check if the stream exists in kinesis for the user
327
        filtered = list(
1✔
328
            filter(
329
                lambda stream_name: stream_name == stream_name_from_arn,
330
                kinesis.list_streams()["StreamNames"],
331
            )
332
        )
333
        return bool(filtered)
1✔
334

335

336
class SSEUtils:
1✔
337
    """Utils for server-side encryption (SSE)"""
338

339
    @classmethod
1✔
340
    def get_sse_kms_managed_key(cls, account_id: str, region_name: str):
1✔
341
        from localstack.services.kms import provider
1✔
342

343
        existing_key = MANAGED_KMS_KEYS.get(region_name)
1✔
344
        if existing_key:
1✔
345
            return existing_key
1✔
346
        kms_client = connect_to(
1✔
347
            aws_access_key_id=account_id,
348
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
349
            region_name=region_name,
350
        ).kms
351
        key_data = kms_client.create_key(
1✔
352
            Description="Default key that protects my DynamoDB data when no other key is defined"
353
        )
354
        key_id = key_data["KeyMetadata"]["KeyId"]
1✔
355

356
        provider.set_key_managed(key_id, account_id, region_name)
1✔
357
        MANAGED_KMS_KEYS[region_name] = key_id
1✔
358
        return key_id
1✔
359

360
    @classmethod
1✔
361
    def get_sse_description(cls, account_id: str, region_name: str, data):
1✔
362
        if data.get("Enabled"):
1✔
363
            kms_master_key_id = data.get("KMSMasterKeyId")
1✔
364
            if not kms_master_key_id:
1✔
365
                # this is of course not the actual key for dynamodb, just a better, since existing, mock
366
                kms_master_key_id = cls.get_sse_kms_managed_key(account_id, region_name)
1✔
367
            kms_master_key_id = arns.kms_key_arn(kms_master_key_id, account_id, region_name)
1✔
368
            return {
1✔
369
                "Status": "ENABLED",
370
                "SSEType": "KMS",  # no other value is allowed here
371
                "KMSMasterKeyArn": kms_master_key_id,
372
            }
373
        return {}
×
374

375

376
class ValidationException(CommonServiceException):
1✔
377
    def __init__(self, message: str):
1✔
378
        super().__init__(code="ValidationException", status_code=400, message=message)
1✔
379

380

381
def get_store(account_id: str, region_name: str) -> DynamoDBStore:
1✔
382
    # special case: AWS NoSQL Workbench sends "localhost" as region - replace with proper region here
383
    region_name = DynamoDBProvider.ddb_region_name(region_name)
1✔
384
    return dynamodb_stores[account_id][region_name]
1✔
385

386

387
@contextmanager
1✔
388
def modify_context_region(context: RequestContext, region: str):
1✔
389
    """
390
    Context manager that modifies the region of a `RequestContext`. At the exit, the context is restored to its
391
    original state.
392

393
    :param context: the context to modify
394
    :param region: the modified region
395
    :return: a modified `RequestContext`
396
    """
397
    original_region = context.region
1✔
398
    original_authorization = context.request.headers.get("Authorization")
1✔
399

400
    key = get_ddb_access_key(context.account_id, region)
1✔
401

402
    context.region = region
1✔
403
    context.request.headers["Authorization"] = re.sub(
1✔
404
        AUTH_CREDENTIAL_REGEX,
405
        rf"Credential={key}/\2/{region}/\4/",
406
        original_authorization or "",
407
        flags=re.IGNORECASE,
408
    )
409

410
    try:
1✔
411
        yield context
1✔
412
    except Exception:
1✔
413
        raise
1✔
414
    finally:
415
        # revert the original context
416
        context.region = original_region
1✔
417
        context.request.headers["Authorization"] = original_authorization
1✔
418

419

420
class DynamoDBDeveloperEndpoints:
1✔
421
    """
422
    Developer endpoints for DynamoDB
423
    DELETE /_aws/dynamodb/expired - delete expired items from tables with TTL enabled; return the number of expired
424
        items deleted
425
    """
426

427
    @route("/_aws/dynamodb/expired", methods=["DELETE"])
1✔
428
    def delete_expired_messages(self, _: Request):
1✔
429
        no_expired_items = delete_expired_items()
1✔
430
        return {"ExpiredItems": no_expired_items}
1✔
431

432

433
def delete_expired_items() -> int:
1✔
434
    """
435
    This utility function iterates over all stores, looks for tables with TTL enabled,
436
    scan such tables and delete expired items.
437
    """
438
    no_expired_items = 0
1✔
439
    for account_id, region_name, state in dynamodb_stores.iter_stores():
1✔
440
        ttl_specs = state.ttl_specifications
1✔
441
        client = connect_to(aws_access_key_id=account_id, region_name=region_name).dynamodb
1✔
442
        for table_name, ttl_spec in ttl_specs.items():
1✔
443
            if ttl_spec.get("Enabled", False):
1✔
444
                attribute_name = ttl_spec.get("AttributeName")
1✔
445
                current_time = int(datetime.now().timestamp())
1✔
446
                try:
1✔
447
                    result = client.scan(
1✔
448
                        TableName=table_name,
449
                        FilterExpression="#ttl <= :threshold",
450
                        ExpressionAttributeValues={":threshold": {"N": str(current_time)}},
451
                        ExpressionAttributeNames={"#ttl": attribute_name},
452
                    )
453
                    items_to_delete = result.get("Items", [])
1✔
454
                    no_expired_items += len(items_to_delete)
1✔
455
                    table_description = client.describe_table(TableName=table_name)
1✔
456
                    partition_key, range_key = _get_hash_and_range_key(table_description)
1✔
457
                    keys_to_delete = [
1✔
458
                        {partition_key: item.get(partition_key)}
459
                        if range_key is None
460
                        else {
461
                            partition_key: item.get(partition_key),
462
                            range_key: item.get(range_key),
463
                        }
464
                        for item in items_to_delete
465
                    ]
466
                    delete_requests = [{"DeleteRequest": {"Key": key}} for key in keys_to_delete]
1✔
467
                    for i in range(0, len(delete_requests), 25):
1✔
468
                        batch = delete_requests[i : i + 25]
1✔
469
                        client.batch_write_item(RequestItems={table_name: batch})
1✔
470
                except Exception as e:
×
471
                    LOG.warning(
×
472
                        "An error occurred when deleting expired items from table %s: %s",
473
                        table_name,
474
                        e,
475
                    )
476
    return no_expired_items
1✔
477

478

479
def _get_hash_and_range_key(table_description: DescribeTableOutput) -> [str, str | None]:
1✔
480
    key_schema = table_description.get("Table", {}).get("KeySchema", [])
1✔
481
    hash_key, range_key = None, None
1✔
482
    for key in key_schema:
1✔
483
        if key["KeyType"] == "HASH":
1✔
484
            hash_key = key["AttributeName"]
1✔
485
        if key["KeyType"] == "RANGE":
1✔
486
            range_key = key["AttributeName"]
1✔
487
    return hash_key, range_key
1✔
488

489

490
class ExpiredItemsWorker:
1✔
491
    """A worker that periodically computes and deletes expired items from DynamoDB tables"""
492

493
    def __init__(self) -> None:
1✔
494
        super().__init__()
1✔
495
        self.scheduler = Scheduler()
1✔
496
        self.thread: FuncThread | None = None
1✔
497
        self.mutex = threading.RLock()
1✔
498

499
    def start(self):
1✔
500
        with self.mutex:
×
501
            if self.thread:
×
502
                return
×
503

504
            self.scheduler = Scheduler()
×
505
            self.scheduler.schedule(
×
506
                delete_expired_items, period=60 * 60
507
            )  # the background process seems slow on AWS
508

509
            def _run(*_args):
×
510
                self.scheduler.run()
×
511

512
            self.thread = start_thread(_run, name="ddb-remove-expired-items")
×
513

514
    def stop(self):
1✔
515
        with self.mutex:
1✔
516
            if self.scheduler:
1✔
517
                self.scheduler.close()
1✔
518

519
            if self.thread:
1✔
520
                self.thread.stop()
×
521

522
            self.thread = None
1✔
523
            self.scheduler = None
1✔
524

525

526
class DynamoDBProvider(DynamodbApi, ServiceLifecycleHook):
1✔
527
    server: DynamodbServer
1✔
528
    """The instance of the server managing the instance of DynamoDB local"""
1✔
529
    asset_directory = f"{config.dirs.data}/dynamodb"
1✔
530
    """The directory that contains the .db files saved by DynamoDB Local"""
1✔
531
    tmp_asset_directory = f"{config.dirs.tmp}/dynamodb"
1✔
532
    """Temporary directory for the .db files saved by DynamoDB Local when MANUAL snapshot persistence is enabled"""
1✔
533

534
    def __init__(self):
1✔
535
        self.server = self._new_dynamodb_server()
1✔
536
        self._expired_items_worker = ExpiredItemsWorker()
1✔
537
        self._router_rules = []
1✔
538
        # TODO: fix _event_forwarder to have lazy instantiation of the ThreadPoolExecutor
539
        self._event_forwarder = EventForwarder()
1✔
540

541
    def on_before_start(self):
1✔
542
        # We must copy back whatever state is saved to the temporary location to avoid to start always from a blank
543
        # state. See the `on_before_state_save` hook.
544
        if is_persistence_enabled() and config.SNAPSHOT_SAVE_STRATEGY == "MANUAL":
1✔
545
            if os.path.exists(self.asset_directory):
×
546
                LOG.debug("Copying %s to %s", self.tmp_asset_directory, self.asset_directory)
×
547
                cp_r(self.asset_directory, self.tmp_asset_directory, rm_dest_on_conflict=True)
×
548

549
        self.server.start_dynamodb()
1✔
550
        if config.DYNAMODB_REMOVE_EXPIRED_ITEMS:
1✔
551
            self._expired_items_worker.start()
×
552
        self._router_rules = ROUTER.add(DynamoDBDeveloperEndpoints())
1✔
553

554
    def on_before_stop(self):
1✔
555
        self._expired_items_worker.stop()
1✔
556
        ROUTER.remove(self._router_rules)
1✔
557
        self._event_forwarder.shutdown()
1✔
558

559
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
560
        visitor.visit(dynamodb_stores)
×
561
        visitor.visit(dynamodbstreams_stores)
×
562
        visitor.visit(AssetDirectory(self.service, os.path.join(config.dirs.data, self.service)))
×
563

564
    def on_before_state_reset(self):
1✔
565
        self.server.stop_dynamodb()
×
566
        rm_rf(self.tmp_asset_directory)
×
567

568
    def on_before_state_load(self):
1✔
569
        self.server.stop_dynamodb()
×
570

571
    def on_after_state_reset(self):
1✔
572
        self.server.start_dynamodb()
×
573

574
    @staticmethod
1✔
575
    def _new_dynamodb_server() -> DynamodbServer:
1✔
576
        return DynamodbServer.get()
1✔
577

578
    def on_after_state_load(self):
1✔
579
        self.server.start_dynamodb()
×
580

581
    def on_before_state_save(self) -> None:
1✔
582
        # When the save strategy is MANUAL, we do not save the DB path to the usual ``confid.dirs.data`` folder.
583
        # With the MANUAL strategy, we want to take a snapshot on-demand but this is not possible if the DB files
584
        # are already in ``config.dirs.data``. For instance, the set of operation below will result in both tables
585
        # being implicitly saved.
586
        # - awslocal dynamodb create-table table1
587
        # - curl -X POST http://localhost:4566/_localstack/state/save
588
        # - awslocal dynamodb create-table table2
589
        # To avoid this problem, we start the DDBLocal server in a temporary directory that is then copied over
590
        # ``config.dirs.data`` when the save needs to be saved.
591
        # The ideal solution to the problem would be to always start the server in memory and have a dump capability.
592
        if is_persistence_enabled() and config.SNAPSHOT_SAVE_STRATEGY == "MANUAL":
×
593
            LOG.debug("Copying %s to %s", self.tmp_asset_directory, self.asset_directory)
×
594
            cp_r(self.tmp_asset_directory, self.asset_directory, rm_dest_on_conflict=True)
×
595

596
    def on_after_init(self):
1✔
597
        # add response processor specific to ddblocal
598
        handlers.modify_service_response.append(self.service, modify_ddblocal_arns)
1✔
599

600
        # routes for the shell ui
601
        ROUTER.add(
1✔
602
            path="/shell",
603
            endpoint=self.handle_shell_ui_redirect,
604
            methods=["GET"],
605
        )
606
        ROUTER.add(
1✔
607
            path="/shell/<regex('.*'):req_path>",
608
            endpoint=self.handle_shell_ui_request,
609
        )
610

611
    def _forward_request(
1✔
612
        self,
613
        context: RequestContext,
614
        region: str | None,
615
        service_request: ServiceRequest | None = None,
616
    ) -> ServiceResponse:
617
        """
618
        Modify the context region and then forward request to DynamoDB Local.
619

620
        This is used for operations impacted by global tables. In LocalStack, a single copy of global table
621
        is kept, and any requests to replicated tables are forwarded to this original table.
622
        """
623
        if region:
1✔
624
            with modify_context_region(context, region):
1✔
625
                return self.forward_request(context, service_request=service_request)
1✔
626
        return self.forward_request(context, service_request=service_request)
×
627

628
    def forward_request(
1✔
629
        self, context: RequestContext, service_request: ServiceRequest = None
630
    ) -> ServiceResponse:
631
        """
632
        Forward a request to DynamoDB Local.
633
        """
634
        self.check_provisioned_throughput(context.operation.name)
1✔
635
        self.prepare_request_headers(
1✔
636
            context.request.headers, account_id=context.account_id, region_name=context.region
637
        )
638
        return self.server.proxy(context, service_request)
1✔
639

640
    def get_forward_url(self, account_id: str, region_name: str) -> str:
1✔
641
        """Return the URL of the backend DynamoDBLocal server to forward requests to"""
642
        return self.server.url
×
643

644
    def handle_shell_ui_redirect(self, request: werkzeug.Request) -> Response:
1✔
645
        headers = {"Refresh": f"0; url={config.external_service_url()}/shell/index.html"}
×
646
        return Response("", headers=headers)
×
647

648
    def handle_shell_ui_request(self, request: werkzeug.Request, req_path: str) -> Response:
1✔
649
        # TODO: "DynamoDB Local Web Shell was deprecated with version 1.16.X and is not available any
650
        #  longer from 1.17.X to latest. There are no immediate plans for a new Web Shell to be introduced."
651
        #  -> keeping this for now, to allow configuring custom installs; should consider removing it in the future
652
        # https://repost.aws/questions/QUHyIzoEDqQ3iOKlUEp1LPWQ#ANdBm9Nz9TRf6VqR3jZtcA1g
653
        req_path = f"/{req_path}" if not req_path.startswith("/") else req_path
×
654
        account_id = extract_account_id_from_headers(request.headers)
×
655
        region_name = extract_region_from_headers(request.headers)
×
656
        url = f"{self.get_forward_url(account_id, region_name)}/shell{req_path}"
×
657
        result = requests.request(
×
658
            method=request.method, url=url, headers=request.headers, data=request.data
659
        )
660
        return Response(result.content, headers=dict(result.headers), status=result.status_code)
×
661

662
    #
663
    # Table ops
664
    #
665

666
    @handler("CreateTable", expand=False)
1✔
667
    def create_table(
1✔
668
        self,
669
        context: RequestContext,
670
        create_table_input: CreateTableInput,
671
    ) -> CreateTableOutput:
672
        table_name = create_table_input["TableName"]
1✔
673

674
        # Return this specific error message to keep parity with AWS
675
        if self.table_exists(context.account_id, context.region, table_name):
1✔
676
            raise ResourceInUseException(f"Table already exists: {table_name}")
1✔
677

678
        billing_mode = create_table_input.get("BillingMode")
1✔
679
        provisioned_throughput = create_table_input.get("ProvisionedThroughput")
1✔
680
        if billing_mode == BillingMode.PAY_PER_REQUEST and provisioned_throughput is not None:
1✔
681
            raise ValidationException(
1✔
682
                "One or more parameter values were invalid: Neither ReadCapacityUnits nor WriteCapacityUnits can be "
683
                "specified when BillingMode is PAY_PER_REQUEST"
684
            )
685

686
        result = self.forward_request(context)
1✔
687

688
        table_description = result["TableDescription"]
1✔
689
        table_description["TableArn"] = table_arn = self.fix_table_arn(
1✔
690
            context.account_id, context.region, table_description["TableArn"]
691
        )
692

693
        backend = get_store(context.account_id, context.region)
1✔
694
        backend.table_definitions[table_name] = table_definitions = dict(create_table_input)
1✔
695
        backend.TABLE_REGION[table_name] = context.region
1✔
696

697
        if "TableId" not in table_definitions:
1✔
698
            table_definitions["TableId"] = long_uid()
1✔
699

700
        if "SSESpecification" in table_definitions:
1✔
701
            sse_specification = table_definitions.pop("SSESpecification")
1✔
702
            table_definitions["SSEDescription"] = SSEUtils.get_sse_description(
1✔
703
                context.account_id, context.region, sse_specification
704
            )
705

706
        if table_definitions:
1✔
707
            table_content = result.get("Table", {})
1✔
708
            table_content.update(table_definitions)
1✔
709
            table_description.update(table_content)
1✔
710

711
        if "StreamSpecification" in table_definitions:
1✔
712
            create_dynamodb_stream(
1✔
713
                context.account_id,
714
                context.region,
715
                table_definitions,
716
                table_description.get("LatestStreamLabel"),
717
            )
718

719
        if "TableClass" in table_definitions:
1✔
720
            table_class = table_description.pop("TableClass", None) or table_definitions.pop(
1✔
721
                "TableClass"
722
            )
723
            table_description["TableClassSummary"] = {"TableClass": table_class}
1✔
724

725
        if "GlobalSecondaryIndexes" in table_description:
1✔
726
            gsis = copy.deepcopy(table_description["GlobalSecondaryIndexes"])
1✔
727
            # update the different values, as DynamoDB-local v2 has a regression around GSI and does not return anything
728
            # anymore
729
            for gsi in gsis:
1✔
730
                index_name = gsi.get("IndexName", "")
1✔
731
                gsi.update(
1✔
732
                    {
733
                        "IndexArn": f"{table_arn}/index/{index_name}",
734
                        "IndexSizeBytes": 0,
735
                        "IndexStatus": "ACTIVE",
736
                        "ItemCount": 0,
737
                    }
738
                )
739
                gsi_provisioned_throughput = gsi.setdefault("ProvisionedThroughput", {})
1✔
740
                gsi_provisioned_throughput["NumberOfDecreasesToday"] = 0
1✔
741

742
                if billing_mode == BillingMode.PAY_PER_REQUEST:
1✔
743
                    gsi_provisioned_throughput["ReadCapacityUnits"] = 0
1✔
744
                    gsi_provisioned_throughput["WriteCapacityUnits"] = 0
1✔
745

746
            table_description["GlobalSecondaryIndexes"] = gsis
1✔
747

748
        if "ProvisionedThroughput" in table_description:
1✔
749
            if "NumberOfDecreasesToday" not in table_description["ProvisionedThroughput"]:
1✔
750
                table_description["ProvisionedThroughput"]["NumberOfDecreasesToday"] = 0
1✔
751

752
        if "WarmThroughput" in table_description:
1✔
753
            table_description["WarmThroughput"]["Status"] = "UPDATING"
1✔
754

755
        tags = table_definitions.pop("Tags", [])
1✔
756
        if tags:
1✔
757
            get_store(context.account_id, context.region).TABLE_TAGS[table_arn] = {
1✔
758
                tag["Key"]: tag["Value"] for tag in tags
759
            }
760

761
        # remove invalid attributes from result
762
        table_description.pop("Tags", None)
1✔
763
        table_description.pop("BillingMode", None)
1✔
764

765
        return result
1✔
766

767
    def delete_table(
1✔
768
        self, context: RequestContext, table_name: TableName, **kwargs
769
    ) -> DeleteTableOutput:
770
        global_table_region = self.get_global_table_region(context, table_name)
1✔
771

772
        self.ensure_table_exists(
1✔
773
            context.account_id,
774
            global_table_region,
775
            table_name,
776
            error_message=f"Requested resource not found: Table: {table_name} not found",
777
        )
778

779
        # Limitation note: On AWS, for a replicated table, if the source table is deleted, the replicated tables continue to exist.
780
        # This is not the case for LocalStack, where all replicated tables will also be removed if source is deleted.
781

782
        result = self._forward_request(context=context, region=global_table_region)
1✔
783

784
        table_arn = result.get("TableDescription", {}).get("TableArn")
1✔
785
        table_arn = self.fix_table_arn(context.account_id, context.region, table_arn)
1✔
786
        dynamodbstreams_api.delete_streams(context.account_id, context.region, table_arn)
1✔
787

788
        store = get_store(context.account_id, context.region)
1✔
789
        store.TABLE_TAGS.pop(table_arn, None)
1✔
790
        store.REPLICAS.pop(table_name, None)
1✔
791

792
        return result
1✔
793

794
    def describe_table(
1✔
795
        self, context: RequestContext, table_name: TableName, **kwargs
796
    ) -> DescribeTableOutput:
797
        global_table_region = self.get_global_table_region(context, table_name)
1✔
798

799
        result = self._forward_request(context=context, region=global_table_region)
1✔
800
        table_description: TableDescription = result["Table"]
1✔
801

802
        # Update table properties from LocalStack stores
803
        if table_props := get_store(context.account_id, context.region).table_properties.get(
1✔
804
            table_name
805
        ):
806
            table_description.update(table_props)
1✔
807

808
        store = get_store(context.account_id, context.region)
1✔
809

810
        # Update replication details
811
        replicas: dict[RegionName, ReplicaDescription] = store.REPLICAS.get(table_name, {})
1✔
812

813
        replica_description_list = []
1✔
814

815
        if global_table_region != context.region:
1✔
816
            replica_description_list.append(
1✔
817
                ReplicaDescription(
818
                    RegionName=global_table_region, ReplicaStatus=ReplicaStatus.ACTIVE
819
                )
820
            )
821

822
        for replica_region, replica_description in replicas.items():
1✔
823
            # The replica in the region being queried must not be returned
824
            if replica_region != context.region:
1✔
825
                replica_description_list.append(replica_description)
1✔
826

827
        if replica_description_list:
1✔
828
            table_description.update({"Replicas": replica_description_list})
1✔
829

830
        # update only TableId and SSEDescription if present
831
        if table_definitions := store.table_definitions.get(table_name):
1✔
832
            for key in ["TableId", "SSEDescription"]:
1✔
833
                if table_definitions.get(key):
1✔
834
                    table_description[key] = table_definitions[key]
1✔
835
            if "TableClass" in table_definitions:
1✔
836
                table_description["TableClassSummary"] = {
1✔
837
                    "TableClass": table_definitions["TableClass"]
838
                }
839
            if warm_throughput := table_definitions.get("WarmThroughput"):
1✔
840
                table_description["WarmThroughput"] = warm_throughput.copy()
1✔
841
                table_description["WarmThroughput"].setdefault("Status", "ACTIVE")
1✔
842

843
        if "GlobalSecondaryIndexes" in table_description:
1✔
844
            for gsi in table_description["GlobalSecondaryIndexes"]:
1✔
845
                default_values = {
1✔
846
                    "NumberOfDecreasesToday": 0,
847
                    "ReadCapacityUnits": 0,
848
                    "WriteCapacityUnits": 0,
849
                }
850
                # even if the billing mode is PAY_PER_REQUEST, AWS returns the Read and Write Capacity Units
851
                # Terraform depends on this parity for update operations
852
                gsi["ProvisionedThroughput"] = default_values | gsi.get("ProvisionedThroughput", {})
1✔
853

854
        # Set defaults for warm throughput
855
        if "WarmThroughput" not in table_description:
1✔
856
            billing_mode = table_definitions.get("BillingMode") if table_definitions else None
1✔
857
            table_description["WarmThroughput"] = {
1✔
858
                "ReadUnitsPerSecond": 12000 if billing_mode == "PAY_PER_REQUEST" else 5,
859
                "WriteUnitsPerSecond": 4000 if billing_mode == "PAY_PER_REQUEST" else 5,
860
            }
861
        table_description["WarmThroughput"]["Status"] = (
1✔
862
            table_description.get("TableStatus") or "ACTIVE"
863
        )
864

865
        return DescribeTableOutput(
1✔
866
            Table=select_from_typed_dict(TableDescription, table_description)
867
        )
868

869
    @handler("UpdateTable", expand=False)
1✔
870
    def update_table(
1✔
871
        self, context: RequestContext, update_table_input: UpdateTableInput
872
    ) -> UpdateTableOutput:
873
        table_name = update_table_input["TableName"]
1✔
874
        global_table_region = self.get_global_table_region(context, table_name)
1✔
875

876
        try:
1✔
877
            result = self._forward_request(context=context, region=global_table_region)
1✔
878
        except CommonServiceException as exc:
1✔
879
            # DynamoDBLocal refuses to update certain table params and raises.
880
            # But we still need to update this info in LocalStack stores
881
            if not (exc.code == "ValidationException" and exc.message == "Nothing to update"):
1✔
882
                raise
×
883

884
            if table_class := update_table_input.get("TableClass"):
1✔
885
                table_definitions = get_store(
1✔
886
                    context.account_id, context.region
887
                ).table_definitions.setdefault(table_name, {})
888
                table_definitions["TableClass"] = table_class
1✔
889

890
            if replica_updates := update_table_input.get("ReplicaUpdates"):
1✔
891
                store = get_store(context.account_id, global_table_region)
1✔
892

893
                # Dict with source region to set of replicated regions
894
                replicas: dict[RegionName, ReplicaDescription] = store.REPLICAS.get(table_name, {})
1✔
895

896
                for replica_update in replica_updates:
1✔
897
                    for key, details in replica_update.items():
1✔
898
                        # Replicated region
899
                        target_region = details.get("RegionName")
1✔
900

901
                        # Check if replicated region is valid
902
                        if target_region not in get_valid_regions_for_service("dynamodb"):
1✔
903
                            raise ValidationException(f"Region {target_region} is not supported")
×
904

905
                        match key:
1✔
906
                            case "Create":
1✔
907
                                if target_region in replicas:
1✔
908
                                    raise ValidationException(
×
909
                                        f"Failed to create a the new replica of table with name: '{table_name}' because one or more replicas already existed as tables."
910
                                    )
911
                                replicas[target_region] = ReplicaDescription(
1✔
912
                                    RegionName=target_region,
913
                                    KMSMasterKeyId=details.get("KMSMasterKeyId"),
914
                                    ProvisionedThroughputOverride=details.get(
915
                                        "ProvisionedThroughputOverride"
916
                                    ),
917
                                    GlobalSecondaryIndexes=details.get("GlobalSecondaryIndexes"),
918
                                    ReplicaStatus=ReplicaStatus.ACTIVE,
919
                                )
920
                            case "Delete":
1✔
921
                                try:
1✔
922
                                    replicas.pop(target_region)
1✔
923
                                except KeyError:
1✔
924
                                    raise ValidationException(
1✔
925
                                        "Update global table operation failed because one or more replicas were not part of the global table."
926
                                    )
927

928
                store.REPLICAS[table_name] = replicas
1✔
929

930
            # update response content
931
            SchemaExtractor.invalidate_table_schema(
1✔
932
                table_name, context.account_id, global_table_region
933
            )
934

935
            schema = SchemaExtractor.get_table_schema(
1✔
936
                table_name, context.account_id, global_table_region
937
            )
938

939
            if sse_specification_input := update_table_input.get("SSESpecification"):
1✔
940
                # If SSESpecification is changed, update store and return the 'UPDATING' status in the response
941
                table_definition = get_store(
1✔
942
                    context.account_id, context.region
943
                ).table_definitions.setdefault(table_name, {})
944
                if not sse_specification_input["Enabled"]:
1✔
945
                    table_definition.pop("SSEDescription", None)
1✔
946
                    schema["Table"]["SSEDescription"]["Status"] = "UPDATING"
1✔
947

948
            return UpdateTableOutput(TableDescription=schema["Table"])
1✔
949

950
        SchemaExtractor.invalidate_table_schema(table_name, context.account_id, global_table_region)
1✔
951

952
        schema = SchemaExtractor.get_table_schema(
1✔
953
            table_name, context.account_id, global_table_region
954
        )
955

956
        # TODO: DDB streams must also be created for replicas
957
        if update_table_input.get("StreamSpecification"):
1✔
958
            create_dynamodb_stream(
1✔
959
                context.account_id,
960
                context.region,
961
                update_table_input,
962
                result["TableDescription"].get("LatestStreamLabel"),
963
            )
964

965
        return UpdateTableOutput(TableDescription=schema["Table"])
1✔
966

967
    def list_tables(
1✔
968
        self,
969
        context: RequestContext,
970
        exclusive_start_table_name: TableName = None,
971
        limit: ListTablesInputLimit = None,
972
        **kwargs,
973
    ) -> ListTablesOutput:
974
        response = self.forward_request(context)
1✔
975

976
        # Add replicated tables
977
        replicas = get_store(context.account_id, context.region).REPLICAS
1✔
978
        for replicated_table, replications in replicas.items():
1✔
979
            for replica_region, replica_description in replications.items():
1✔
980
                if context.region == replica_region:
1✔
981
                    response["TableNames"].append(replicated_table)
1✔
982

983
        return response
1✔
984

985
    #
986
    # Contributor Insights
987
    #
988

989
    @handler("DescribeContributorInsights", expand=False)
1✔
990
    def describe_contributor_insights(
1✔
991
        self,
992
        context: RequestContext,
993
        describe_contributor_insights_input: DescribeContributorInsightsInput,
994
    ) -> DescribeContributorInsightsOutput:
995
        return DescribeContributorInsightsOutput(
1✔
996
            TableName=describe_contributor_insights_input["TableName"],
997
            IndexName=describe_contributor_insights_input.get("IndexName"),
998
            ContributorInsightsStatus="DISABLED",
999
        )
1000

1001
    #
1002
    # Item ops
1003
    #
1004

1005
    @handler("PutItem", expand=False)
1✔
1006
    def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:
1✔
1007
        table_name = put_item_input["TableName"]
1✔
1008
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1009

1010
        has_return_values = put_item_input.get("ReturnValues") == "ALL_OLD"
1✔
1011
        stream_type = get_table_stream_type(context.account_id, context.region, table_name)
1✔
1012

1013
        # if the request doesn't ask for ReturnValues and we have stream enabled, we need to modify the request to
1014
        # force DDBLocal to return those values
1015
        if stream_type and not has_return_values:
1✔
1016
            service_req = copy.copy(context.service_request)
1✔
1017
            service_req["ReturnValues"] = "ALL_OLD"
1✔
1018
            result = self._forward_request(
1✔
1019
                context=context, region=global_table_region, service_request=service_req
1020
            )
1021
        else:
1022
            result = self._forward_request(context=context, region=global_table_region)
1✔
1023

1024
        # Since this operation makes use of global table region, we need to use the same region for all
1025
        # calls made via the inter-service client. This is taken care of by passing the account ID and
1026
        # region, e.g. when getting the stream spec
1027

1028
        # Get stream specifications details for the table
1029
        if stream_type:
1✔
1030
            item = put_item_input["Item"]
1✔
1031
            # prepare record keys
1032
            keys = SchemaExtractor.extract_keys(
1✔
1033
                item=item,
1034
                table_name=table_name,
1035
                account_id=context.account_id,
1036
                region_name=global_table_region,
1037
            )
1038
            # because we modified the request, we will always have the ReturnValues if we have streams enabled
1039
            if has_return_values:
1✔
1040
                existing_item = result.get("Attributes")
×
1041
            else:
1042
                # remove the ReturnValues if the client didn't ask for it
1043
                existing_item = result.pop("Attributes", None)
1✔
1044

1045
            if existing_item == item:
1✔
1046
                return result
1✔
1047

1048
            # create record
1049
            record = self.get_record_template(
1✔
1050
                context.region,
1051
            )
1052
            record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
1053
            record["dynamodb"]["Keys"] = keys
1✔
1054
            record["dynamodb"]["SizeBytes"] = _get_size_bytes(item)
1✔
1055

1056
            if stream_type.needs_new_image:
1✔
1057
                record["dynamodb"]["NewImage"] = item
1✔
1058
            if stream_type.stream_view_type:
1✔
1059
                record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
1060
            if existing_item and stream_type.needs_old_image:
1✔
1061
                record["dynamodb"]["OldImage"] = existing_item
1✔
1062

1063
            records_map = {
1✔
1064
                table_name: TableRecords(records=[record], table_stream_type=stream_type)
1065
            }
1066
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1067
        return result
1✔
1068

1069
    @handler("DeleteItem", expand=False)
1✔
1070
    def delete_item(
1✔
1071
        self,
1072
        context: RequestContext,
1073
        delete_item_input: DeleteItemInput,
1074
    ) -> DeleteItemOutput:
1075
        table_name = delete_item_input["TableName"]
1✔
1076
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1077

1078
        has_return_values = delete_item_input.get("ReturnValues") == "ALL_OLD"
1✔
1079
        stream_type = get_table_stream_type(context.account_id, context.region, table_name)
1✔
1080

1081
        # if the request doesn't ask for ReturnValues and we have stream enabled, we need to modify the request to
1082
        # force DDBLocal to return those values
1083
        if stream_type and not has_return_values:
1✔
1084
            service_req = copy.copy(context.service_request)
1✔
1085
            service_req["ReturnValues"] = "ALL_OLD"
1✔
1086
            result = self._forward_request(
1✔
1087
                context=context, region=global_table_region, service_request=service_req
1088
            )
1089
        else:
1090
            result = self._forward_request(context=context, region=global_table_region)
1✔
1091

1092
        # determine and forward stream record
1093
        if stream_type:
1✔
1094
            # because we modified the request, we will always have the ReturnValues if we have streams enabled
1095
            if has_return_values:
1✔
1096
                existing_item = result.get("Attributes")
×
1097
            else:
1098
                # remove the ReturnValues if the client didn't ask for it
1099
                existing_item = result.pop("Attributes", None)
1✔
1100

1101
            if not existing_item:
1✔
1102
                return result
×
1103

1104
            # create record
1105
            record = self.get_record_template(context.region)
1✔
1106
            record["eventName"] = "REMOVE"
1✔
1107
            record["dynamodb"]["Keys"] = delete_item_input["Key"]
1✔
1108
            record["dynamodb"]["SizeBytes"] = _get_size_bytes(existing_item)
1✔
1109

1110
            if stream_type.stream_view_type:
1✔
1111
                record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
1112
            if stream_type.needs_old_image:
1✔
1113
                record["dynamodb"]["OldImage"] = existing_item
1✔
1114

1115
            records_map = {
1✔
1116
                table_name: TableRecords(records=[record], table_stream_type=stream_type)
1117
            }
1118
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1119

1120
        return result
1✔
1121

1122
    @handler("UpdateItem", expand=False)
1✔
1123
    def update_item(
1✔
1124
        self,
1125
        context: RequestContext,
1126
        update_item_input: UpdateItemInput,
1127
    ) -> UpdateItemOutput:
1128
        # TODO: UpdateItem is harder to use ReturnValues for Streams, because it needs the Before and After images.
1129
        table_name = update_item_input["TableName"]
1✔
1130
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1131

1132
        existing_item = None
1✔
1133
        stream_type = get_table_stream_type(context.account_id, context.region, table_name)
1✔
1134

1135
        # even if we don't need the OldImage, we still need to fetch the existing item to know if the event is INSERT
1136
        # or MODIFY (UpdateItem will create the object if it doesn't exist, and you don't use a ConditionExpression)
1137
        if stream_type:
1✔
1138
            existing_item = ItemFinder.find_existing_item(
1✔
1139
                put_item=update_item_input,
1140
                table_name=table_name,
1141
                account_id=context.account_id,
1142
                region_name=context.region,
1143
                endpoint_url=self.server.url,
1144
            )
1145

1146
        result = self._forward_request(context=context, region=global_table_region)
1✔
1147

1148
        # construct and forward stream record
1149
        if stream_type:
1✔
1150
            updated_item = ItemFinder.find_existing_item(
1✔
1151
                put_item=update_item_input,
1152
                table_name=table_name,
1153
                account_id=context.account_id,
1154
                region_name=context.region,
1155
                endpoint_url=self.server.url,
1156
            )
1157
            if not updated_item or updated_item == existing_item:
1✔
1158
                return result
×
1159

1160
            record = self.get_record_template(context.region)
1✔
1161
            record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
1162
            record["dynamodb"]["Keys"] = update_item_input["Key"]
1✔
1163
            record["dynamodb"]["SizeBytes"] = _get_size_bytes(updated_item)
1✔
1164

1165
            if stream_type.stream_view_type:
1✔
1166
                record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
1167
            if existing_item and stream_type.needs_old_image:
1✔
1168
                record["dynamodb"]["OldImage"] = existing_item
1✔
1169
            if stream_type.needs_new_image:
1✔
1170
                record["dynamodb"]["NewImage"] = updated_item
1✔
1171

1172
            records_map = {
1✔
1173
                table_name: TableRecords(records=[record], table_stream_type=stream_type)
1174
            }
1175
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1176

1177
        return result
1✔
1178

1179
    @handler("GetItem", expand=False)
1✔
1180
    def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:
1✔
1181
        table_name = get_item_input["TableName"]
1✔
1182
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1183
        result = self._forward_request(context=context, region=global_table_region)
1✔
1184
        self.fix_consumed_capacity(get_item_input, result)
1✔
1185
        return result
1✔
1186

1187
    #
1188
    # Queries
1189
    #
1190

1191
    @handler("Query", expand=False)
1✔
1192
    def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:
1✔
1193
        index_name = query_input.get("IndexName")
1✔
1194
        if index_name:
1✔
1195
            if not is_index_query_valid(context.account_id, context.region, query_input):
1✔
1196
                raise ValidationException(
1✔
1197
                    "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "
1198
                    "is not supported for global secondary index id-index because its projection "
1199
                    "type is not ALL",
1200
                )
1201

1202
        table_name = query_input["TableName"]
1✔
1203
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1204
        result = self._forward_request(context=context, region=global_table_region)
1✔
1205
        self.fix_consumed_capacity(query_input, result)
1✔
1206
        return result
1✔
1207

1208
    @handler("Scan", expand=False)
1✔
1209
    def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:
1✔
1210
        table_name = scan_input["TableName"]
1✔
1211
        global_table_region = self.get_global_table_region(context, table_name)
1✔
1212
        result = self._forward_request(context=context, region=global_table_region)
1✔
1213
        return result
1✔
1214

1215
    #
1216
    # Batch ops
1217
    #
1218

1219
    @handler("BatchWriteItem", expand=False)
1✔
1220
    def batch_write_item(
1✔
1221
        self,
1222
        context: RequestContext,
1223
        batch_write_item_input: BatchWriteItemInput,
1224
    ) -> BatchWriteItemOutput:
1225
        # TODO: add global table support
1226
        existing_items = {}
1✔
1227
        existing_items_to_fetch: BatchWriteItemRequestMap = {}
1✔
1228
        # UnprocessedItems should have the same format as RequestItems
1229
        unprocessed_items = {}
1✔
1230
        request_items = batch_write_item_input["RequestItems"]
1✔
1231

1232
        tables_stream_type: dict[TableName, TableStreamType] = {}
1✔
1233

1234
        for table_name, items in sorted(request_items.items(), key=itemgetter(0)):
1✔
1235
            if stream_type := get_table_stream_type(context.account_id, context.region, table_name):
1✔
1236
                tables_stream_type[table_name] = stream_type
1✔
1237

1238
            for request in items:
1✔
1239
                request: WriteRequest
1240
                for key, inner_request in request.items():
1✔
1241
                    inner_request: PutRequest | DeleteRequest
1242
                    if self.should_throttle("BatchWriteItem"):
1✔
1243
                        unprocessed_items_for_table = unprocessed_items.setdefault(table_name, [])
×
1244
                        unprocessed_items_for_table.append(request)
×
1245

1246
                    elif stream_type:
1✔
1247
                        existing_items_to_fetch_for_table = existing_items_to_fetch.setdefault(
1✔
1248
                            table_name, []
1249
                        )
1250
                        existing_items_to_fetch_for_table.append(inner_request)
1✔
1251

1252
        if existing_items_to_fetch:
1✔
1253
            existing_items = ItemFinder.find_existing_items(
1✔
1254
                put_items_per_table=existing_items_to_fetch,
1255
                account_id=context.account_id,
1256
                region_name=context.region,
1257
                endpoint_url=self.server.url,
1258
            )
1259

1260
        try:
1✔
1261
            result = self.forward_request(context)
1✔
1262
        except CommonServiceException as e:
1✔
1263
            # TODO: validate if DynamoDB still raises `One of the required keys was not given a value`
1264
            # for now, replace with the schema error validation
1265
            if e.message == "One of the required keys was not given a value":
1✔
1266
                raise ValidationException("The provided key element does not match the schema")
1✔
1267
            raise e
×
1268

1269
        # determine and forward stream records
1270
        if tables_stream_type:
1✔
1271
            records_map = self.prepare_batch_write_item_records(
1✔
1272
                account_id=context.account_id,
1273
                region_name=context.region,
1274
                tables_stream_type=tables_stream_type,
1275
                request_items=request_items,
1276
                existing_items=existing_items,
1277
            )
1278
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1279

1280
        # TODO: should unprocessed item which have mutated by `prepare_batch_write_item_records` be returned
1281
        for table_name, unprocessed_items_in_table in unprocessed_items.items():
1✔
1282
            unprocessed: dict = result["UnprocessedItems"]
×
1283
            result_unprocessed_table = unprocessed.setdefault(table_name, [])
×
1284

1285
            # add the Unprocessed items to the response
1286
            # TODO: check before if the same request has not been Unprocessed by DDB local already?
1287
            # those might actually have been processed? shouldn't we remove them from the proxied request?
1288
            for request in unprocessed_items_in_table:
×
1289
                result_unprocessed_table.append(request)
×
1290

1291
            # remove any table entry if it's empty
1292
            result["UnprocessedItems"] = {k: v for k, v in unprocessed.items() if v}
×
1293

1294
        return result
1✔
1295

1296
    @handler("BatchGetItem")
1✔
1297
    def batch_get_item(
1✔
1298
        self,
1299
        context: RequestContext,
1300
        request_items: BatchGetRequestMap,
1301
        return_consumed_capacity: ReturnConsumedCapacity = None,
1302
        **kwargs,
1303
    ) -> BatchGetItemOutput:
1304
        # TODO: add global table support
1305
        return self.forward_request(context)
1✔
1306

1307
    #
1308
    # Transactions
1309
    #
1310

1311
    @handler("TransactWriteItems", expand=False)
1✔
1312
    def transact_write_items(
1✔
1313
        self,
1314
        context: RequestContext,
1315
        transact_write_items_input: TransactWriteItemsInput,
1316
    ) -> TransactWriteItemsOutput:
1317
        # TODO: add global table support
1318
        existing_items = {}
1✔
1319
        existing_items_to_fetch: dict[str, list[Put | Update | Delete]] = {}
1✔
1320
        updated_items_to_fetch: dict[str, list[Update]] = {}
1✔
1321
        transact_items = transact_write_items_input["TransactItems"]
1✔
1322
        tables_stream_type: dict[TableName, TableStreamType] = {}
1✔
1323
        no_stream_tables = set()
1✔
1324

1325
        for item in transact_items:
1✔
1326
            item: TransactWriteItem
1327
            for key in ["Put", "Update", "Delete", "ConditionCheck"]:
1✔
1328
                inner_item: Put | Delete | Update = item.get(key)
1✔
1329
                if inner_item:
1✔
1330
                    # Extract the table name from the ARN; DynamoDB Local does not currently support
1331
                    # full ARNs in this operation: https://github.com/awslabs/amazon-dynamodb-local-samples/issues/34
1332
                    inner_item["TableName"] = table_name = inner_item["TableName"].split(":table/")[
1✔
1333
                        -1
1334
                    ]
1335
                    # if we've seen the table already exists and it does not have streams, skip
1336
                    if table_name in no_stream_tables:
1✔
1337
                        continue
1✔
1338

1339
                    # if we have not seen the table, fetch its streaming status
1340
                    if table_name not in tables_stream_type:
1✔
1341
                        if stream_type := get_table_stream_type(
1✔
1342
                            context.account_id, context.region, table_name
1343
                        ):
1344
                            tables_stream_type[table_name] = stream_type
1✔
1345
                        else:
1346
                            # no stream,
1347
                            no_stream_tables.add(table_name)
1✔
1348
                            continue
1✔
1349

1350
                    existing_items_to_fetch_for_table = existing_items_to_fetch.setdefault(
1✔
1351
                        table_name, []
1352
                    )
1353
                    existing_items_to_fetch_for_table.append(inner_item)
1✔
1354
                    if key == "Update":
1✔
1355
                        updated_items_to_fetch_for_table = updated_items_to_fetch.setdefault(
1✔
1356
                            table_name, []
1357
                        )
1358
                        updated_items_to_fetch_for_table.append(inner_item)
1✔
1359

1360
                    continue
1✔
1361
        # Normalize the request structure to ensure it matches the expected format for DynamoDB Local.
1362
        data = json.loads(context.request.data)
1✔
1363
        data["TransactItems"] = transact_items
1✔
1364
        context.request.data = to_bytes(json.dumps(data, cls=BytesEncoder))
1✔
1365

1366
        if existing_items_to_fetch:
1✔
1367
            existing_items = ItemFinder.find_existing_items(
1✔
1368
                put_items_per_table=existing_items_to_fetch,
1369
                account_id=context.account_id,
1370
                region_name=context.region,
1371
                endpoint_url=self.server.url,
1372
            )
1373

1374
        client_token: str | None = transact_write_items_input.get("ClientRequestToken")
1✔
1375

1376
        if client_token:
1✔
1377
            # we sort the payload since identical payload but with different order could cause
1378
            # IdempotentParameterMismatchException error if a client token is provided
1379
            context.request.data = to_bytes(canonical_json(json.loads(context.request.data)))
1✔
1380

1381
        result = self.forward_request(context)
1✔
1382

1383
        # determine and forward stream records
1384
        if tables_stream_type:
1✔
1385
            updated_items = (
1✔
1386
                ItemFinder.find_existing_items(
1387
                    put_items_per_table=existing_items_to_fetch,
1388
                    account_id=context.account_id,
1389
                    region_name=context.region,
1390
                    endpoint_url=self.server.url,
1391
                )
1392
                if updated_items_to_fetch
1393
                else {}
1394
            )
1395

1396
            records_map = self.prepare_transact_write_item_records(
1✔
1397
                account_id=context.account_id,
1398
                region_name=context.region,
1399
                transact_items=transact_items,
1400
                existing_items=existing_items,
1401
                updated_items=updated_items,
1402
                tables_stream_type=tables_stream_type,
1403
            )
1404
            self.forward_stream_records(context.account_id, context.region, records_map)
1✔
1405

1406
        return result
1✔
1407

1408
    @handler("TransactGetItems", expand=False)
1✔
1409
    def transact_get_items(
1✔
1410
        self,
1411
        context: RequestContext,
1412
        transact_items: TransactGetItemList,
1413
        return_consumed_capacity: ReturnConsumedCapacity = None,
1414
    ) -> TransactGetItemsOutput:
1415
        for transact_item in transact_items["TransactItems"]:
1✔
1416
            if item := transact_item.get("Get"):
1✔
1417
                # Extract the table name from the ARN; DynamoDB Local does not currently support
1418
                # full ARNs in this operation: https://github.com/awslabs/amazon-dynamodb-local-samples/issues/34
1419
                item["TableName"] = item["TableName"].split(":table/")[-1]
1✔
1420

1421
        # Normalize the request structure to ensure it matches the expected format for DynamoDB Local.
1422
        data = json.loads(context.request.data)
1✔
1423
        data["TransactItems"] = transact_items["TransactItems"]
1✔
1424
        context.request.data = to_bytes(json.dumps(data, cls=BytesEncoder))
1✔
1425

1426
        return self.forward_request(context)
1✔
1427

1428
    @handler("ExecuteTransaction", expand=False)
1✔
1429
    def execute_transaction(
1✔
1430
        self, context: RequestContext, execute_transaction_input: ExecuteTransactionInput
1431
    ) -> ExecuteTransactionOutput:
1432
        result = self.forward_request(context)
1✔
1433
        return result
1✔
1434

1435
    @handler("ExecuteStatement", expand=False)
1✔
1436
    def execute_statement(
1✔
1437
        self,
1438
        context: RequestContext,
1439
        execute_statement_input: ExecuteStatementInput,
1440
    ) -> ExecuteStatementOutput:
1441
        # TODO: this operation is still really slow with streams enabled
1442
        #  find a way to make it better, same way as the other operations, by using returnvalues
1443
        # see https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.update.html
1444
        statement = execute_statement_input["Statement"]
1✔
1445
        # We found out that 'Parameters' can be an empty list when the request comes from the AWS JS client.
1446
        if execute_statement_input.get("Parameters", None) == []:  # noqa
1✔
1447
            raise ValidationException(
1✔
1448
                "1 validation error detected: Value '[]' at 'parameters' failed to satisfy constraint: Member must have length greater than or equal to 1"
1449
            )
1450
        table_name = extract_table_name_from_partiql_update(statement)
1✔
1451
        existing_items = None
1✔
1452
        stream_type = table_name and get_table_stream_type(
1✔
1453
            context.account_id, context.region, table_name
1454
        )
1455
        if stream_type:
1✔
1456
            # Note: fetching the entire list of items is hugely inefficient, especially for larger tables
1457
            # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!
1458
            existing_items = ItemFinder.list_existing_items_for_statement(
1✔
1459
                partiql_statement=statement,
1460
                account_id=context.account_id,
1461
                region_name=context.region,
1462
                endpoint_url=self.server.url,
1463
            )
1464

1465
        result = self.forward_request(context)
1✔
1466

1467
        # construct and forward stream record
1468
        if stream_type:
1✔
1469
            records = get_updated_records(
1✔
1470
                account_id=context.account_id,
1471
                region_name=context.region,
1472
                table_name=table_name,
1473
                existing_items=existing_items,
1474
                server_url=self.server.url,
1475
                table_stream_type=stream_type,
1476
            )
1477
            self.forward_stream_records(context.account_id, context.region, records)
1✔
1478

1479
        return result
1✔
1480

1481
    #
1482
    # Tags
1483
    #
1484

1485
    def tag_resource(
1✔
1486
        self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList, **kwargs
1487
    ) -> None:
1488
        table_tags = get_store(context.account_id, context.region).TABLE_TAGS
1✔
1489
        if resource_arn not in table_tags:
1✔
UNCOV
1490
            table_tags[resource_arn] = {}
×
1491
        table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})
1✔
1492

1493
    def untag_resource(
1✔
1494
        self,
1495
        context: RequestContext,
1496
        resource_arn: ResourceArnString,
1497
        tag_keys: TagKeyList,
1498
        **kwargs,
1499
    ) -> None:
1500
        for tag_key in tag_keys or []:
1✔
1501
            get_store(context.account_id, context.region).TABLE_TAGS.get(resource_arn, {}).pop(
1✔
1502
                tag_key, None
1503
            )
1504

1505
    def list_tags_of_resource(
1✔
1506
        self,
1507
        context: RequestContext,
1508
        resource_arn: ResourceArnString,
1509
        next_token: NextTokenString = None,
1510
        **kwargs,
1511
    ) -> ListTagsOfResourceOutput:
1512
        result = [
1✔
1513
            {"Key": k, "Value": v}
1514
            for k, v in get_store(context.account_id, context.region)
1515
            .TABLE_TAGS.get(resource_arn, {})
1516
            .items()
1517
        ]
1518
        return ListTagsOfResourceOutput(Tags=result)
1✔
1519

1520
    #
1521
    # TTLs
1522
    #
1523

1524
    def describe_time_to_live(
1✔
1525
        self, context: RequestContext, table_name: TableName, **kwargs
1526
    ) -> DescribeTimeToLiveOutput:
1527
        if not self.table_exists(context.account_id, context.region, table_name):
1✔
1528
            raise ResourceNotFoundException(
1✔
1529
                f"Requested resource not found: Table: {table_name} not found"
1530
            )
1531

1532
        backend = get_store(context.account_id, context.region)
1✔
1533
        ttl_spec = backend.ttl_specifications.get(table_name)
1✔
1534

1535
        result = {"TimeToLiveStatus": "DISABLED"}
1✔
1536
        if ttl_spec:
1✔
1537
            if ttl_spec.get("Enabled"):
1✔
1538
                ttl_status = "ENABLED"
1✔
1539
            else:
1540
                ttl_status = "DISABLED"
1✔
1541
            result = {
1✔
1542
                "AttributeName": ttl_spec.get("AttributeName"),
1543
                "TimeToLiveStatus": ttl_status,
1544
            }
1545

1546
        return DescribeTimeToLiveOutput(TimeToLiveDescription=result)
1✔
1547

1548
    def update_time_to_live(
1✔
1549
        self,
1550
        context: RequestContext,
1551
        table_name: TableName,
1552
        time_to_live_specification: TimeToLiveSpecification,
1553
        **kwargs,
1554
    ) -> UpdateTimeToLiveOutput:
1555
        if not self.table_exists(context.account_id, context.region, table_name):
1✔
1556
            raise ResourceNotFoundException(
1✔
1557
                f"Requested resource not found: Table: {table_name} not found"
1558
            )
1559

1560
        # TODO: TTL status is maintained/mocked but no real expiry is happening for items
1561
        backend = get_store(context.account_id, context.region)
1✔
1562
        backend.ttl_specifications[table_name] = time_to_live_specification
1✔
1563
        return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)
1✔
1564

1565
    #
1566
    # Global tables
1567
    #
1568

1569
    def create_global_table(
1✔
1570
        self,
1571
        context: RequestContext,
1572
        global_table_name: TableName,
1573
        replication_group: ReplicaList,
1574
        **kwargs,
1575
    ) -> CreateGlobalTableOutput:
1576
        global_tables: dict = get_store(context.account_id, context.region).GLOBAL_TABLES
1✔
1577
        if global_table_name in global_tables:
1✔
1578
            raise GlobalTableAlreadyExistsException("Global table with this name already exists")
1✔
1579
        replication_group = [grp.copy() for grp in replication_group or []]
1✔
1580
        data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}
1✔
1581
        global_tables[global_table_name] = data
1✔
1582
        for group in replication_group:
1✔
1583
            group["ReplicaStatus"] = "ACTIVE"
1✔
1584
            group["ReplicaStatusDescription"] = "Replica active"
1✔
1585
        return CreateGlobalTableOutput(GlobalTableDescription=data)
1✔
1586

1587
    def describe_global_table(
1✔
1588
        self, context: RequestContext, global_table_name: TableName, **kwargs
1589
    ) -> DescribeGlobalTableOutput:
1590
        details = get_store(context.account_id, context.region).GLOBAL_TABLES.get(global_table_name)
1✔
1591
        if not details:
1✔
1592
            raise GlobalTableNotFoundException("Global table with this name does not exist")
1✔
1593
        return DescribeGlobalTableOutput(GlobalTableDescription=details)
1✔
1594

1595
    def list_global_tables(
1✔
1596
        self,
1597
        context: RequestContext,
1598
        exclusive_start_global_table_name: TableName = None,
1599
        limit: PositiveIntegerObject = None,
1600
        region_name: RegionName = None,
1601
        **kwargs,
1602
    ) -> ListGlobalTablesOutput:
1603
        # TODO: add paging support
UNCOV
1604
        result = [
×
1605
            select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])
1606
            for tab in get_store(context.account_id, context.region).GLOBAL_TABLES.values()
1607
        ]
UNCOV
1608
        return ListGlobalTablesOutput(GlobalTables=result)
×
1609

1610
    def update_global_table(
1✔
1611
        self,
1612
        context: RequestContext,
1613
        global_table_name: TableName,
1614
        replica_updates: ReplicaUpdateList,
1615
        **kwargs,
1616
    ) -> UpdateGlobalTableOutput:
1617
        details = get_store(context.account_id, context.region).GLOBAL_TABLES.get(global_table_name)
1✔
1618
        if not details:
1✔
UNCOV
1619
            raise GlobalTableNotFoundException("Global table with this name does not exist")
×
1620
        for update in replica_updates or []:
1✔
1621
            repl_group = details["ReplicationGroup"]
1✔
1622
            # delete existing
1623
            delete = update.get("Delete")
1✔
1624
            if delete:
1✔
1625
                details["ReplicationGroup"] = [
1✔
1626
                    g for g in repl_group if g["RegionName"] != delete["RegionName"]
1627
                ]
1628
            # create new
1629
            create = update.get("Create")
1✔
1630
            if create:
1✔
1631
                exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]
1✔
1632
                if exists:
1✔
UNCOV
1633
                    continue
×
1634
                new_group = {
1✔
1635
                    "RegionName": create["RegionName"],
1636
                    "ReplicaStatus": "ACTIVE",
1637
                    "ReplicaStatusDescription": "Replica active",
1638
                }
1639
                details["ReplicationGroup"].append(new_group)
1✔
1640
        return UpdateGlobalTableOutput(GlobalTableDescription=details)
1✔
1641

1642
    #
1643
    # Kinesis Streaming
1644
    #
1645

1646
    def enable_kinesis_streaming_destination(
1✔
1647
        self,
1648
        context: RequestContext,
1649
        table_name: TableName,
1650
        stream_arn: StreamArn,
1651
        enable_kinesis_streaming_configuration: EnableKinesisStreamingConfiguration = None,
1652
        **kwargs,
1653
    ) -> KinesisStreamingDestinationOutput:
1654
        self.ensure_table_exists(
1✔
1655
            context.account_id,
1656
            context.region,
1657
            table_name,
1658
            error_message=f"Requested resource not found: Table: {table_name} not found",
1659
        )
1660

1661
        # TODO: Use the time precision in config if set
1662
        enable_kinesis_streaming_configuration = enable_kinesis_streaming_configuration or {}
1✔
1663

1664
        stream = self._event_forwarder.is_kinesis_stream_exists(stream_arn=stream_arn)
1✔
1665
        if not stream:
1✔
UNCOV
1666
            raise ValidationException("User does not have a permission to use kinesis stream")
×
1667

1668
        table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1✔
1669
            table_name, {}
1670
        )
1671

1672
        dest_status = table_def.get("KinesisDataStreamDestinationStatus")
1✔
1673
        if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:
1✔
UNCOV
1674
            raise ValidationException(
×
1675
                "Table is not in a valid state to enable Kinesis Streaming "
1676
                "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1677
                "to perform ENABLE operation."
1678
            )
1679

1680
        table_def.setdefault("KinesisDataStreamDestinations", [])
1✔
1681

1682
        # remove the stream destination if already present
1683
        table_def["KinesisDataStreamDestinations"] = [
1✔
1684
            t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn
1685
        ]
1686
        # append the active stream destination at the end of the list
1687
        table_def["KinesisDataStreamDestinations"].append(
1✔
1688
            {
1689
                "DestinationStatus": DestinationStatus.ACTIVE,
1690
                "DestinationStatusDescription": "Stream is active",
1691
                "StreamArn": stream_arn,
1692
                "ApproximateCreationDateTimePrecision": ApproximateCreationDateTimePrecision.MILLISECOND,
1693
            }
1694
        )
1695
        table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.ACTIVE
1✔
1696
        return KinesisStreamingDestinationOutput(
1✔
1697
            DestinationStatus=DestinationStatus.ENABLING,
1698
            StreamArn=stream_arn,
1699
            TableName=table_name,
1700
            EnableKinesisStreamingConfiguration=enable_kinesis_streaming_configuration,
1701
        )
1702

1703
    def disable_kinesis_streaming_destination(
1✔
1704
        self,
1705
        context: RequestContext,
1706
        table_name: TableName,
1707
        stream_arn: StreamArn,
1708
        enable_kinesis_streaming_configuration: EnableKinesisStreamingConfiguration = None,
1709
        **kwargs,
1710
    ) -> KinesisStreamingDestinationOutput:
1711
        self.ensure_table_exists(
1✔
1712
            context.account_id,
1713
            context.region,
1714
            table_name,
1715
            error_message=f"Requested resource not found: Table: {table_name} not found",
1716
        )
1717

1718
        # TODO: Must raise if invoked before KinesisStreamingDestination is ACTIVE
1719

1720
        stream = self._event_forwarder.is_kinesis_stream_exists(stream_arn=stream_arn)
1✔
1721
        if not stream:
1✔
UNCOV
1722
            raise ValidationException(
×
1723
                "User does not have a permission to use kinesis stream",
1724
            )
1725

1726
        table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1✔
1727
            table_name, {}
1728
        )
1729

1730
        stream_destinations = table_def.get("KinesisDataStreamDestinations")
1✔
1731
        if stream_destinations:
1✔
1732
            if table_def["KinesisDataStreamDestinationStatus"] == DestinationStatus.ACTIVE:
1✔
1733
                for dest in stream_destinations:
1✔
1734
                    if (
1✔
1735
                        dest["StreamArn"] == stream_arn
1736
                        and dest["DestinationStatus"] == DestinationStatus.ACTIVE
1737
                    ):
1738
                        dest["DestinationStatus"] = DestinationStatus.DISABLED
1✔
1739
                        dest["DestinationStatusDescription"] = ("Stream is disabled",)
1✔
1740
                        table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.DISABLED
1✔
1741
                        return KinesisStreamingDestinationOutput(
1✔
1742
                            DestinationStatus=DestinationStatus.DISABLING,
1743
                            StreamArn=stream_arn,
1744
                            TableName=table_name,
1745
                        )
UNCOV
1746
        raise ValidationException(
×
1747
            "Table is not in a valid state to disable Kinesis Streaming Destination:"
1748
            "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."
1749
        )
1750

1751
    def describe_kinesis_streaming_destination(
1✔
1752
        self, context: RequestContext, table_name: TableName, **kwargs
1753
    ) -> DescribeKinesisStreamingDestinationOutput:
1754
        self.ensure_table_exists(context.account_id, context.region, table_name)
1✔
1755

1756
        table_def = (
1✔
1757
            get_store(context.account_id, context.region).table_definitions.get(table_name) or {}
1758
        )
1759

1760
        stream_destinations = table_def.get("KinesisDataStreamDestinations") or []
1✔
1761
        stream_destinations = copy.deepcopy(stream_destinations)
1✔
1762

1763
        for destination in stream_destinations:
1✔
1764
            destination.pop("ApproximateCreationDateTimePrecision", None)
1✔
1765
            destination.pop("DestinationStatusDescription", None)
1✔
1766

1767
        return DescribeKinesisStreamingDestinationOutput(
1✔
1768
            KinesisDataStreamDestinations=stream_destinations,
1769
            TableName=table_name,
1770
        )
1771

1772
    def update_kinesis_streaming_destination(
1✔
1773
        self,
1774
        context: RequestContext,
1775
        table_name: TableArn,
1776
        stream_arn: StreamArn,
1777
        update_kinesis_streaming_configuration: UpdateKinesisStreamingConfiguration | None = None,
1778
        **kwargs,
1779
    ) -> UpdateKinesisStreamingDestinationOutput:
1780
        self.ensure_table_exists(context.account_id, context.region, table_name)
1✔
1781

1782
        if not update_kinesis_streaming_configuration:
1✔
1783
            raise ValidationException(
1✔
1784
                "Streaming destination cannot be updated with given parameters: "
1785
                "UpdateKinesisStreamingConfiguration cannot be null or contain only null values"
1786
            )
1787

1788
        time_precision = update_kinesis_streaming_configuration.get(
1✔
1789
            "ApproximateCreationDateTimePrecision"
1790
        )
1791
        if time_precision not in (
1✔
1792
            ApproximateCreationDateTimePrecision.MILLISECOND,
1793
            ApproximateCreationDateTimePrecision.MICROSECOND,
1794
        ):
1795
            raise ValidationException(
1✔
1796
                f"1 validation error detected: Value '{time_precision}' at "
1797
                "'updateKinesisStreamingConfiguration.approximateCreationDateTimePrecision' failed to satisfy constraint: "
1798
                "Member must satisfy enum value set: [MILLISECOND, MICROSECOND]"
1799
            )
1800

1801
        store = get_store(context.account_id, context.region)
1✔
1802

1803
        table_def = store.table_definitions.get(table_name) or {}
1✔
1804
        table_def.setdefault("KinesisDataStreamDestinations", [])
1✔
1805

1806
        table_id = table_def["TableId"]
1✔
1807

1808
        destination = None
1✔
1809
        for stream in table_def["KinesisDataStreamDestinations"]:
1✔
1810
            if stream["StreamArn"] == stream_arn:
1✔
1811
                destination = stream
1✔
1812

1813
        if destination is None:
1✔
1814
            raise ValidationException(
1✔
1815
                "Table is not in a valid state to enable Kinesis Streaming Destination: "
1816
                f"No streaming destination with streamArn: {stream_arn} found for table with tableName: {table_name}"
1817
            )
1818

1819
        if (
1✔
1820
            existing_precision := destination["ApproximateCreationDateTimePrecision"]
1821
        ) == update_kinesis_streaming_configuration["ApproximateCreationDateTimePrecision"]:
1822
            raise ValidationException(
1✔
1823
                f"Invalid Request: Precision is already set to the desired value of {existing_precision} "
1824
                f"for tableId: {table_id}, kdsArn: {stream_arn}"
1825
            )
1826

1827
        destination["ApproximateCreationDateTimePrecision"] = time_precision
1✔
1828

1829
        return UpdateKinesisStreamingDestinationOutput(
1✔
1830
            TableName=table_name,
1831
            StreamArn=stream_arn,
1832
            DestinationStatus=DestinationStatus.UPDATING,
1833
            UpdateKinesisStreamingConfiguration=UpdateKinesisStreamingConfiguration(
1834
                ApproximateCreationDateTimePrecision=time_precision,
1835
            ),
1836
        )
1837

1838
    #
1839
    # Continuous Backups
1840
    #
1841

1842
    def describe_continuous_backups(
1✔
1843
        self, context: RequestContext, table_name: TableName, **kwargs
1844
    ) -> DescribeContinuousBackupsOutput:
1845
        self.get_global_table_region(context, table_name)
1✔
1846
        store = get_store(context.account_id, context.region)
1✔
1847
        continuous_backup_description = (
1✔
1848
            store.table_properties.get(table_name, {}).get("ContinuousBackupsDescription")
1849
        ) or ContinuousBackupsDescription(
1850
            ContinuousBackupsStatus=ContinuousBackupsStatus.ENABLED,
1851
            PointInTimeRecoveryDescription=PointInTimeRecoveryDescription(
1852
                PointInTimeRecoveryStatus=PointInTimeRecoveryStatus.DISABLED
1853
            ),
1854
        )
1855

1856
        return DescribeContinuousBackupsOutput(
1✔
1857
            ContinuousBackupsDescription=continuous_backup_description
1858
        )
1859

1860
    def update_continuous_backups(
1✔
1861
        self,
1862
        context: RequestContext,
1863
        table_name: TableName,
1864
        point_in_time_recovery_specification: PointInTimeRecoverySpecification,
1865
        **kwargs,
1866
    ) -> UpdateContinuousBackupsOutput:
1867
        self.get_global_table_region(context, table_name)
1✔
1868

1869
        store = get_store(context.account_id, context.region)
1✔
1870
        pit_recovery_status = (
1✔
1871
            PointInTimeRecoveryStatus.ENABLED
1872
            if point_in_time_recovery_specification["PointInTimeRecoveryEnabled"]
1873
            else PointInTimeRecoveryStatus.DISABLED
1874
        )
1875
        continuous_backup_description = ContinuousBackupsDescription(
1✔
1876
            ContinuousBackupsStatus=ContinuousBackupsStatus.ENABLED,
1877
            PointInTimeRecoveryDescription=PointInTimeRecoveryDescription(
1878
                PointInTimeRecoveryStatus=pit_recovery_status
1879
            ),
1880
        )
1881
        table_props = store.table_properties.setdefault(table_name, {})
1✔
1882
        table_props["ContinuousBackupsDescription"] = continuous_backup_description
1✔
1883

1884
        return UpdateContinuousBackupsOutput(
1✔
1885
            ContinuousBackupsDescription=continuous_backup_description
1886
        )
1887

1888
    #
1889
    # Helpers
1890
    #
1891

1892
    @staticmethod
1✔
1893
    def ddb_region_name(region_name: str) -> str:
1✔
1894
        """Map `local` or `localhost` region to the us-east-1 region. These values are used by NoSQL Workbench."""
1895
        # TODO: could this be somehow moved into the request handler chain?
1896
        if region_name in ("local", "localhost"):
1✔
UNCOV
1897
            region_name = AWS_REGION_US_EAST_1
×
1898

1899
        return region_name
1✔
1900

1901
    @staticmethod
1✔
1902
    def table_exists(account_id: str, region_name: str, table_name: str) -> bool:
1✔
1903
        region_name = DynamoDBProvider.ddb_region_name(region_name)
1✔
1904

1905
        client = connect_to(
1✔
1906
            aws_access_key_id=account_id,
1907
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
1908
            region_name=region_name,
1909
        ).dynamodb
1910
        return dynamodb_table_exists(table_name, client)
1✔
1911

1912
    @staticmethod
1✔
1913
    def ensure_table_exists(
1✔
1914
        account_id: str,
1915
        region_name: str,
1916
        table_name: str,
1917
        error_message: str = "Cannot do operations on a non-existent table",
1918
    ):
1919
        """
1920
        Raise ResourceNotFoundException if the given table does not exist.
1921

1922
        :param account_id: account id
1923
        :param region_name: region name
1924
        :param table_name: table name
1925
        :raise: ResourceNotFoundException if table does not exist in DynamoDB Local
1926
        """
1927
        if not DynamoDBProvider.table_exists(account_id, region_name, table_name):
1✔
1928
            raise ResourceNotFoundException(error_message)
1✔
1929

1930
    @staticmethod
1✔
1931
    def get_global_table_region(context: RequestContext, table_name: str) -> str:
1✔
1932
        """
1933
        Return the table region considering that it might be a replicated table.
1934

1935
        Replication in LocalStack works by keeping a single copy of a table and forwarding
1936
        requests to the region where this table exists.
1937

1938
        This method does not check whether the table actually exists in DDBLocal.
1939

1940
        :param context: request context
1941
        :param table_name: table name
1942
        :return: region
1943
        """
1944
        store = get_store(context.account_id, context.region)
1✔
1945

1946
        table_region = store.TABLE_REGION.get(table_name)
1✔
1947
        replicated_at = store.REPLICAS.get(table_name, {}).keys()
1✔
1948

1949
        if context.region == table_region or context.region in replicated_at:
1✔
1950
            return table_region
1✔
1951

1952
        return context.region
1✔
1953

1954
    @staticmethod
1✔
1955
    def prepare_request_headers(headers: dict, account_id: str, region_name: str):
1✔
1956
        """
1957
        Modify the Credentials field of Authorization header to achieve namespacing in DynamoDBLocal.
1958
        """
1959
        region_name = DynamoDBProvider.ddb_region_name(region_name)
1✔
1960
        key = get_ddb_access_key(account_id, region_name)
1✔
1961

1962
        # DynamoDBLocal namespaces based on the value of Credentials
1963
        # Since we want to namespace by both account ID and region, use an aggregate key
1964
        # We also replace the region to keep compatibility with NoSQL Workbench
1965
        headers["Authorization"] = re.sub(
1✔
1966
            AUTH_CREDENTIAL_REGEX,
1967
            rf"Credential={key}/\2/{region_name}/\4/",
1968
            headers.get("Authorization") or "",
1969
            flags=re.IGNORECASE,
1970
        )
1971

1972
    def fix_consumed_capacity(self, request: dict, result: dict):
1✔
1973
        # make sure we append 'ConsumedCapacity', which is properly
1974
        # returned by dynalite, but not by AWS's DynamoDBLocal
1975
        table_name = request.get("TableName")
1✔
1976
        return_cap = request.get("ReturnConsumedCapacity")
1✔
1977
        if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:
1✔
UNCOV
1978
            request["ConsumedCapacity"] = {
×
1979
                "TableName": table_name,
1980
                "CapacityUnits": 5,  # TODO hardcoded
1981
                "ReadCapacityUnits": 2,
1982
                "WriteCapacityUnits": 3,
1983
            }
1984

1985
    def fix_table_arn(self, account_id: str, region_name: str, arn: str) -> str:
1✔
1986
        """
1987
        Set the correct account ID and region in ARNs returned by DynamoDB Local.
1988
        """
1989
        partition = get_partition(region_name)
1✔
1990
        return (
1✔
1991
            arn.replace("arn:aws:", f"arn:{partition}:")
1992
            .replace(":ddblocal:", f":{region_name}:")
1993
            .replace(":000000000000:", f":{account_id}:")
1994
        )
1995

1996
    def prepare_transact_write_item_records(
1✔
1997
        self,
1998
        account_id: str,
1999
        region_name: str,
2000
        transact_items: TransactWriteItemList,
2001
        existing_items: BatchGetResponseMap,
2002
        updated_items: BatchGetResponseMap,
2003
        tables_stream_type: dict[TableName, TableStreamType],
2004
    ) -> RecordsMap:
2005
        records_only_map: dict[TableName, StreamRecords] = defaultdict(list)
1✔
2006

2007
        for request in transact_items:
1✔
2008
            record = self.get_record_template(region_name)
1✔
2009
            match request:
1✔
2010
                case {"Put": {"TableName": table_name, "Item": new_item}}:
1✔
2011
                    if not (stream_type := tables_stream_type.get(table_name)):
1✔
2012
                        continue
1✔
2013
                    keys = SchemaExtractor.extract_keys(
1✔
2014
                        item=new_item,
2015
                        table_name=table_name,
2016
                        account_id=account_id,
2017
                        region_name=region_name,
2018
                    )
2019
                    existing_item = find_item_for_keys_values_in_batch(
1✔
2020
                        table_name, keys, existing_items
2021
                    )
2022
                    if existing_item == new_item:
1✔
2023
                        continue
1✔
2024

2025
                    if stream_type.stream_view_type:
1✔
2026
                        record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
2027

2028
                    record["eventID"] = short_uid()
1✔
2029
                    record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
2030
                    record["dynamodb"]["Keys"] = keys
1✔
2031
                    if stream_type.needs_new_image:
1✔
2032
                        record["dynamodb"]["NewImage"] = new_item
1✔
2033
                    if existing_item and stream_type.needs_old_image:
1✔
2034
                        record["dynamodb"]["OldImage"] = existing_item
1✔
2035

2036
                    record_item = de_dynamize_record(new_item)
1✔
2037
                    record["dynamodb"]["SizeBytes"] = _get_size_bytes(record_item)
1✔
2038
                    records_only_map[table_name].append(record)
1✔
2039
                    continue
1✔
2040

2041
                case {"Update": {"TableName": table_name, "Key": keys}}:
1✔
2042
                    if not (stream_type := tables_stream_type.get(table_name)):
1✔
UNCOV
2043
                        continue
×
2044
                    updated_item = find_item_for_keys_values_in_batch(
1✔
2045
                        table_name, keys, updated_items
2046
                    )
2047
                    if not updated_item:
1✔
UNCOV
2048
                        continue
×
2049

2050
                    existing_item = find_item_for_keys_values_in_batch(
1✔
2051
                        table_name, keys, existing_items
2052
                    )
2053
                    if existing_item == updated_item:
1✔
2054
                        # if the item is the same as the previous version, AWS does not send an event
2055
                        continue
1✔
2056

2057
                    if stream_type.stream_view_type:
1✔
2058
                        record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
2059

2060
                    record["eventID"] = short_uid()
1✔
2061
                    record["eventName"] = "MODIFY" if existing_item else "INSERT"
1✔
2062
                    record["dynamodb"]["Keys"] = keys
1✔
2063

2064
                    if existing_item and stream_type.needs_old_image:
1✔
2065
                        record["dynamodb"]["OldImage"] = existing_item
1✔
2066
                    if stream_type.needs_new_image:
1✔
2067
                        record["dynamodb"]["NewImage"] = updated_item
1✔
2068

2069
                    record["dynamodb"]["SizeBytes"] = _get_size_bytes(updated_item)
1✔
2070
                    records_only_map[table_name].append(record)
1✔
2071
                    continue
1✔
2072

2073
                case {"Delete": {"TableName": table_name, "Key": keys}}:
1✔
2074
                    if not (stream_type := tables_stream_type.get(table_name)):
1✔
UNCOV
2075
                        continue
×
2076

2077
                    existing_item = find_item_for_keys_values_in_batch(
1✔
2078
                        table_name, keys, existing_items
2079
                    )
2080
                    if not existing_item:
1✔
UNCOV
2081
                        continue
×
2082

2083
                    if stream_type.stream_view_type:
1✔
2084
                        record["dynamodb"]["StreamViewType"] = stream_type.stream_view_type
1✔
2085

2086
                    record["eventID"] = short_uid()
1✔
2087
                    record["eventName"] = "REMOVE"
1✔
2088
                    record["dynamodb"]["Keys"] = keys
1✔
2089
                    if stream_type.needs_old_image:
1✔
2090
                        record["dynamodb"]["OldImage"] = existing_item
1✔
2091
                    record_item = de_dynamize_record(existing_item)
1✔
2092
                    record["dynamodb"]["SizeBytes"] = _get_size_bytes(record_item)
1✔
2093

2094
                    records_only_map[table_name].append(record)
1✔
2095
                    continue
1✔
2096

2097
        records_map = {
1✔
2098
            table_name: TableRecords(
2099
                records=records, table_stream_type=tables_stream_type[table_name]
2100
            )
2101
            for table_name, records in records_only_map.items()
2102
        }
2103

2104
        return records_map
1✔
2105

2106
    def batch_execute_statement(
1✔
2107
        self,
2108
        context: RequestContext,
2109
        statements: PartiQLBatchRequest,
2110
        return_consumed_capacity: ReturnConsumedCapacity = None,
2111
        **kwargs,
2112
    ) -> BatchExecuteStatementOutput:
2113
        result = self.forward_request(context)
1✔
2114
        return result
1✔
2115

2116
    def prepare_batch_write_item_records(
1✔
2117
        self,
2118
        account_id: str,
2119
        region_name: str,
2120
        tables_stream_type: dict[TableName, TableStreamType],
2121
        request_items: BatchWriteItemRequestMap,
2122
        existing_items: BatchGetResponseMap,
2123
    ) -> RecordsMap:
2124
        records_map: RecordsMap = {}
1✔
2125

2126
        # only iterate over tables with streams
2127
        for table_name, stream_type in tables_stream_type.items():
1✔
2128
            existing_items_for_table_unordered = existing_items.get(table_name, [])
1✔
2129
            table_records: StreamRecords = []
1✔
2130

2131
            def find_existing_item_for_keys_values(item_keys: dict) -> AttributeMap | None:
1✔
2132
                """
2133
                This function looks up in the existing items for the provided item keys subset. If present, returns the
2134
                full item.
2135
                :param item_keys: the request item keys
2136
                :return:
2137
                """
2138
                keys_items = item_keys.items()
1✔
2139
                for item in existing_items_for_table_unordered:
1✔
2140
                    if keys_items <= item.items():
1✔
2141
                        return item
1✔
2142

2143
            for write_request in request_items[table_name]:
1✔
2144
                record = self.get_record_template(
1✔
2145
                    region_name,
2146
                    stream_view_type=stream_type.stream_view_type,
2147
                )
2148
                match write_request:
1✔
2149
                    case {"PutRequest": request}:
1✔
2150
                        keys = SchemaExtractor.extract_keys(
1✔
2151
                            item=request["Item"],
2152
                            table_name=table_name,
2153
                            account_id=account_id,
2154
                            region_name=region_name,
2155
                        )
2156
                        # we need to find if there was an existing item even if we don't need it for `OldImage`, because
2157
                        # of the `eventName`
2158
                        existing_item = find_existing_item_for_keys_values(keys)
1✔
2159
                        if existing_item == request["Item"]:
1✔
2160
                            # if the item is the same as the previous version, AWS does not send an event
2161
                            continue
1✔
2162
                        record["eventID"] = short_uid()
1✔
2163
                        record["dynamodb"]["SizeBytes"] = _get_size_bytes(request["Item"])
1✔
2164
                        record["eventName"] = "INSERT" if not existing_item else "MODIFY"
1✔
2165
                        record["dynamodb"]["Keys"] = keys
1✔
2166

2167
                        if stream_type.needs_new_image:
1✔
2168
                            record["dynamodb"]["NewImage"] = request["Item"]
1✔
2169
                        if existing_item and stream_type.needs_old_image:
1✔
2170
                            record["dynamodb"]["OldImage"] = existing_item
1✔
2171

2172
                        table_records.append(record)
1✔
2173
                        continue
1✔
2174

2175
                    case {"DeleteRequest": request}:
1✔
2176
                        keys = request["Key"]
1✔
2177
                        if not (existing_item := find_existing_item_for_keys_values(keys)):
1✔
UNCOV
2178
                            continue
×
2179

2180
                        record["eventID"] = short_uid()
1✔
2181
                        record["eventName"] = "REMOVE"
1✔
2182
                        record["dynamodb"]["Keys"] = keys
1✔
2183
                        if stream_type.needs_old_image:
1✔
2184
                            record["dynamodb"]["OldImage"] = existing_item
1✔
2185
                        record["dynamodb"]["SizeBytes"] = _get_size_bytes(existing_item)
1✔
2186
                        table_records.append(record)
1✔
2187
                        continue
1✔
2188

2189
            records_map[table_name] = TableRecords(
1✔
2190
                records=table_records, table_stream_type=stream_type
2191
            )
2192

2193
        return records_map
1✔
2194

2195
    def forward_stream_records(
1✔
2196
        self,
2197
        account_id: str,
2198
        region_name: str,
2199
        records_map: RecordsMap,
2200
    ) -> None:
2201
        if not records_map:
1✔
UNCOV
2202
            return
×
2203

2204
        self._event_forwarder.forward_to_targets(
1✔
2205
            account_id, region_name, records_map, background=True
2206
        )
2207

2208
    @staticmethod
1✔
2209
    def get_record_template(region_name: str, stream_view_type: str | None = None) -> StreamRecord:
1✔
2210
        record = {
1✔
2211
            "eventID": short_uid(),
2212
            "eventVersion": "1.1",
2213
            "dynamodb": {
2214
                # expects nearest second rounded down
2215
                "ApproximateCreationDateTime": int(time.time()),
2216
                "SizeBytes": -1,
2217
            },
2218
            "awsRegion": region_name,
2219
            "eventSource": "aws:dynamodb",
2220
        }
2221
        if stream_view_type:
1✔
2222
            record["dynamodb"]["StreamViewType"] = stream_view_type
1✔
2223

2224
        return record
1✔
2225

2226
    def check_provisioned_throughput(self, action):
1✔
2227
        """
2228
        Check rate limiting for an API operation and raise an error if provisioned throughput is exceeded.
2229
        """
2230
        if self.should_throttle(action):
1✔
2231
            message = (
1✔
2232
                "The level of configured provisioned throughput for the table was exceeded. "
2233
                + "Consider increasing your provisioning level with the UpdateTable API"
2234
            )
2235
            raise ProvisionedThroughputExceededException(message)
1✔
2236

2237
    def action_should_throttle(self, action, actions):
1✔
2238
        throttled = [f"{ACTION_PREFIX}{a}" for a in actions]
1✔
2239
        return (action in throttled) or (action in actions)
1✔
2240

2241
    def should_throttle(self, action):
1✔
2242
        if (
1✔
2243
            not config.DYNAMODB_READ_ERROR_PROBABILITY
2244
            and not config.DYNAMODB_ERROR_PROBABILITY
2245
            and not config.DYNAMODB_WRITE_ERROR_PROBABILITY
2246
        ):
2247
            # early exit so we don't need to call random()
2248
            return False
1✔
2249

2250
        rand = random.random()
1✔
2251
        if rand < config.DYNAMODB_READ_ERROR_PROBABILITY and self.action_should_throttle(
1✔
2252
            action, READ_THROTTLED_ACTIONS
2253
        ):
2254
            return True
1✔
2255
        elif rand < config.DYNAMODB_WRITE_ERROR_PROBABILITY and self.action_should_throttle(
1✔
2256
            action, WRITE_THROTTLED_ACTIONS
2257
        ):
2258
            return True
1✔
2259
        elif rand < config.DYNAMODB_ERROR_PROBABILITY and self.action_should_throttle(
1✔
2260
            action, THROTTLED_ACTIONS
2261
        ):
2262
            return True
1✔
2263
        return False
1✔
2264

2265

2266
# ---
2267
# Misc. util functions
2268
# ---
2269

2270

2271
def _get_size_bytes(item: dict) -> int:
1✔
2272
    try:
1✔
2273
        size_bytes = len(json.dumps(item, separators=(",", ":")))
1✔
2274
    except TypeError:
1✔
2275
        size_bytes = len(str(item))
1✔
2276
    return size_bytes
1✔
2277

2278

2279
def get_global_secondary_index(account_id: str, region_name: str, table_name: str, index_name: str):
1✔
2280
    schema = SchemaExtractor.get_table_schema(table_name, account_id, region_name)
1✔
2281
    for index in schema["Table"].get("GlobalSecondaryIndexes", []):
1✔
2282
        if index["IndexName"] == index_name:
1✔
2283
            return index
1✔
UNCOV
2284
    raise ResourceNotFoundException("Index not found")
×
2285

2286

2287
def is_local_secondary_index(
1✔
2288
    account_id: str, region_name: str, table_name: str, index_name: str
2289
) -> bool:
2290
    schema = SchemaExtractor.get_table_schema(table_name, account_id, region_name)
1✔
2291
    for index in schema["Table"].get("LocalSecondaryIndexes", []):
1✔
2292
        if index["IndexName"] == index_name:
1✔
2293
            return True
1✔
2294
    return False
1✔
2295

2296

2297
def is_index_query_valid(account_id: str, region_name: str, query_data: dict) -> bool:
1✔
2298
    table_name = to_str(query_data["TableName"])
1✔
2299
    index_name = to_str(query_data["IndexName"])
1✔
2300
    if is_local_secondary_index(account_id, region_name, table_name, index_name):
1✔
2301
        return True
1✔
2302
    index_query_type = query_data.get("Select")
1✔
2303
    index = get_global_secondary_index(account_id, region_name, table_name, index_name)
1✔
2304
    index_projection_type = index.get("Projection").get("ProjectionType")
1✔
2305
    if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":
1✔
2306
        return False
1✔
2307
    return True
1✔
2308

2309

2310
def get_table_stream_type(
1✔
2311
    account_id: str, region_name: str, table_name_or_arn: str
2312
) -> TableStreamType | None:
2313
    """
2314
    :param account_id: the account id of the table
2315
    :param region_name: the region of the table
2316
    :param table_name_or_arn: the table name or ARN
2317
    :return: a TableStreamViewType object if the table has streams enabled. If not, return None
2318
    """
2319
    if not table_name_or_arn:
1✔
UNCOV
2320
        return
×
2321

2322
    table_name = table_name_or_arn.split(":table/")[-1]
1✔
2323

2324
    is_kinesis = False
1✔
2325
    stream_view_type = None
1✔
2326

2327
    if table_definition := get_store(account_id, region_name).table_definitions.get(table_name):
1✔
2328
        if table_definition.get("KinesisDataStreamDestinationStatus") == "ACTIVE":
1✔
2329
            is_kinesis = True
1✔
2330

2331
    table_arn = arns.dynamodb_table_arn(table_name, account_id=account_id, region_name=region_name)
1✔
2332

2333
    if (
1✔
2334
        stream := dynamodbstreams_api.get_stream_for_table(account_id, region_name, table_arn)
2335
    ) and stream["StreamStatus"] in (StreamStatus.ENABLING, StreamStatus.ENABLED):
2336
        stream_view_type = stream["StreamViewType"]
1✔
2337

2338
    if is_kinesis or stream_view_type:
1✔
2339
        return TableStreamType(stream_view_type, is_kinesis=is_kinesis)
1✔
2340

2341

2342
def get_updated_records(
1✔
2343
    account_id: str,
2344
    region_name: str,
2345
    table_name: str,
2346
    existing_items: list,
2347
    server_url: str,
2348
    table_stream_type: TableStreamType,
2349
) -> RecordsMap:
2350
    """
2351
    Determine the list of record updates, to be sent to a DDB stream after a PartiQL update operation.
2352

2353
    Note: This is currently a fairly expensive operation, as we need to retrieve the list of all items
2354
          from the table, and compare the items to the previously available. This is a limitation as
2355
          we're currently using the DynamoDB Local backend as a blackbox. In future, we should consider hooking
2356
          into the PartiQL query execution inside DynamoDB Local and directly extract the list of updated items.
2357
    """
2358
    result = []
1✔
2359

2360
    key_schema = SchemaExtractor.get_key_schema(table_name, account_id, region_name)
1✔
2361
    before = ItemSet(existing_items, key_schema=key_schema)
1✔
2362
    all_table_items = ItemFinder.get_all_table_items(
1✔
2363
        account_id=account_id,
2364
        region_name=region_name,
2365
        table_name=table_name,
2366
        endpoint_url=server_url,
2367
    )
2368
    after = ItemSet(all_table_items, key_schema=key_schema)
1✔
2369

2370
    def _add_record(item, comparison_set: ItemSet):
1✔
2371
        matching_item = comparison_set.find_item(item)
1✔
2372
        if matching_item == item:
1✔
UNCOV
2373
            return
×
2374

2375
        # determine event type
2376
        if comparison_set == after:
1✔
2377
            if matching_item:
1✔
2378
                return
1✔
2379
            event_name = "REMOVE"
1✔
2380
        else:
2381
            event_name = "INSERT" if not matching_item else "MODIFY"
1✔
2382

2383
        old_image = item if event_name == "REMOVE" else matching_item
1✔
2384
        new_image = matching_item if event_name == "REMOVE" else item
1✔
2385

2386
        # prepare record
2387
        keys = SchemaExtractor.extract_keys_for_schema(item=item, key_schema=key_schema)
1✔
2388

2389
        record = DynamoDBProvider.get_record_template(region_name)
1✔
2390
        record["eventName"] = event_name
1✔
2391
        record["dynamodb"]["Keys"] = keys
1✔
2392
        record["dynamodb"]["SizeBytes"] = _get_size_bytes(item)
1✔
2393

2394
        if table_stream_type.stream_view_type:
1✔
2395
            record["dynamodb"]["StreamViewType"] = table_stream_type.stream_view_type
1✔
2396
        if table_stream_type.needs_new_image:
1✔
UNCOV
2397
            record["dynamodb"]["NewImage"] = new_image
×
2398
        if old_image and table_stream_type.needs_old_image:
1✔
UNCOV
2399
            record["dynamodb"]["OldImage"] = old_image
×
2400

2401
        result.append(record)
1✔
2402

2403
    # loop over items in new item list (find INSERT/MODIFY events)
2404
    for item in after.items_list:
1✔
2405
        _add_record(item, before)
1✔
2406
    # loop over items in old item list (find REMOVE events)
2407
    for item in before.items_list:
1✔
2408
        _add_record(item, after)
1✔
2409

2410
    return {table_name: TableRecords(records=result, table_stream_type=table_stream_type)}
1✔
2411

2412

2413
def create_dynamodb_stream(account_id: str, region_name: str, data, latest_stream_label):
1✔
2414
    stream = data["StreamSpecification"]
1✔
2415
    enabled = stream.get("StreamEnabled")
1✔
2416

2417
    if enabled not in [False, "False"]:
1✔
2418
        table_name = data["TableName"]
1✔
2419
        view_type = stream["StreamViewType"]
1✔
2420

2421
        dynamodbstreams_api.add_dynamodb_stream(
1✔
2422
            account_id=account_id,
2423
            region_name=region_name,
2424
            table_name=table_name,
2425
            latest_stream_label=latest_stream_label,
2426
            view_type=view_type,
2427
            enabled=enabled,
2428
        )
2429

2430

2431
def dynamodb_get_table_stream_specification(account_id: str, region_name: str, table_name: str):
1✔
UNCOV
2432
    try:
×
UNCOV
2433
        table_schema = SchemaExtractor.get_table_schema(
×
2434
            table_name, account_id=account_id, region_name=region_name
2435
        )
UNCOV
2436
        return table_schema["Table"].get("StreamSpecification")
×
UNCOV
2437
    except Exception as e:
×
UNCOV
2438
        LOG.info(
×
2439
            "Unable to get stream specification for table %s: %s %s",
2440
            table_name,
2441
            e,
2442
            traceback.format_exc(),
2443
        )
UNCOV
2444
        raise e
×
2445

2446

2447
def find_item_for_keys_values_in_batch(
1✔
2448
    table_name: str, item_keys: dict, batch: BatchGetResponseMap
2449
) -> AttributeMap | None:
2450
    """
2451
    This function looks up in the existing items for the provided item keys subset. If present, returns the
2452
    full item.
2453
    :param table_name: the table name for the item
2454
    :param item_keys: the request item keys
2455
    :param batch: the values in which to look for the item
2456
    :return: a DynamoDB Item (AttributeMap)
2457
    """
2458
    keys = item_keys.items()
1✔
2459
    for item in batch.get(table_name, []):
1✔
2460
        if keys <= item.items():
1✔
2461
            return item
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc