• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.62
/localstack-core/localstack/services/dynamodbstreams/v2/provider.py
1
import logging
1✔
2

3
from localstack.aws import handlers
1✔
4
from localstack.aws.api import RequestContext, ServiceRequest, ServiceResponse, handler
1✔
5
from localstack.aws.api.dynamodbstreams import (
1✔
6
    DescribeStreamInput,
7
    DescribeStreamOutput,
8
    DynamodbstreamsApi,
9
    GetRecordsInput,
10
    GetRecordsOutput,
11
    GetShardIteratorInput,
12
    GetShardIteratorOutput,
13
    ListStreamsInput,
14
    ListStreamsOutput,
15
)
16
from localstack.services.dynamodb.server import DynamodbServer
1✔
17
from localstack.services.dynamodb.utils import modify_ddblocal_arns
1✔
18
from localstack.services.dynamodb.v2.provider import DynamoDBProvider, modify_context_region
1✔
19
from localstack.services.dynamodbstreams.dynamodbstreams_api import get_original_region
1✔
20
from localstack.services.plugins import ServiceLifecycleHook
1✔
21
from localstack.state import StateVisitor
1✔
22
from localstack.utils.aws.arns import parse_arn
1✔
23

24
LOG = logging.getLogger(__name__)
1✔
25

26

27
class DynamoDBStreamsProvider(DynamodbstreamsApi, ServiceLifecycleHook):
1✔
28
    shard_to_region: dict[str, str]
1✔
29
    """Map a shard iterator to the originating region. This is used in case of replica tables, as LocalStack keeps the
1✔
30
    data in one region only, redirecting all the requests from replica regions."""
31

32
    def __init__(self):
1✔
33
        self.server = DynamodbServer.get()
×
34
        self.shard_to_region = {}
×
35

36
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
37
        # DynamoDB Streams state is entirely dependent on DynamoDB Local state, and does not hold state itself
38
        # the DynamoDB provider is responsible for the persistence of DDB Streams
39
        pass
×
40

41
    def on_after_init(self):
1✔
42
        # add response processor specific to ddblocal
43
        handlers.modify_service_response.append(self.service, modify_ddblocal_arns)
×
44

45
    def on_before_start(self):
1✔
46
        self.server.start_dynamodb()
×
47

48
    def _forward_request(
1✔
49
        self, context: RequestContext, region: str | None, service_request: ServiceRequest
50
    ) -> ServiceResponse:
51
        """
52
        Modify the context region and then forward request to DynamoDB Local.
53

54
        This is used for operations impacted by global tables. In LocalStack, a single copy of global table
55
        is kept, and any requests to replicated tables are forwarded to this original table.
56
        """
57
        if region:
×
58
            with modify_context_region(context, region):
×
59
                return self.forward_request(context, service_request=service_request)
×
60
        return self.forward_request(context, service_request=service_request)
×
61

62
    def forward_request(
1✔
63
        self, context: RequestContext, service_request: ServiceRequest = None
64
    ) -> ServiceResponse:
65
        """
66
        Forward a request to DynamoDB Local.
67
        """
68
        DynamoDBProvider.prepare_request_headers(
×
69
            context.request.headers, account_id=context.account_id, region_name=context.region
70
        )
71
        return self.server.proxy(context, service_request)
×
72

73
    def modify_stream_arn_for_ddb_local(self, stream_arn: str) -> str:
1✔
74
        parsed_arn = parse_arn(stream_arn)
×
75

76
        return f"arn:aws:dynamodb:ddblocal:000000000000:{parsed_arn['resource']}"
×
77

78
    @handler("DescribeStream", expand=False)
1✔
79
    def describe_stream(
1✔
80
        self,
81
        context: RequestContext,
82
        payload: DescribeStreamInput,
83
    ) -> DescribeStreamOutput:
84
        global_table_region = get_original_region(context=context, stream_arn=payload["StreamArn"])
×
85
        request = payload.copy()
×
86
        request["StreamArn"] = self.modify_stream_arn_for_ddb_local(request.get("StreamArn", ""))
×
87
        return self._forward_request(
×
88
            context=context, service_request=request, region=global_table_region
89
        )
90

91
    @handler("GetRecords", expand=False)
1✔
92
    def get_records(self, context: RequestContext, payload: GetRecordsInput) -> GetRecordsOutput:
1✔
93
        request = payload.copy()
×
94
        request["ShardIterator"] = self.modify_stream_arn_for_ddb_local(
×
95
            request.get("ShardIterator", "")
96
        )
97
        region = self.shard_to_region.pop(request["ShardIterator"], None)
×
98
        response = self._forward_request(context=context, region=region, service_request=request)
×
99
        # Similar as the logic in GetShardIterator, we need to track the originating region when we get the
100
        # NextShardIterator in the results.
101
        if (
×
102
            region
103
            and region != context.region
104
            and (next_shard := response.get("NextShardIterator"))
105
        ):
106
            self.shard_to_region[next_shard] = region
×
107
        return response
×
108

109
    @handler("GetShardIterator", expand=False)
1✔
110
    def get_shard_iterator(
1✔
111
        self, context: RequestContext, payload: GetShardIteratorInput
112
    ) -> GetShardIteratorOutput:
113
        global_table_region = get_original_region(context=context, stream_arn=payload["StreamArn"])
×
114
        request = payload.copy()
×
115
        request["StreamArn"] = self.modify_stream_arn_for_ddb_local(request.get("StreamArn", ""))
×
116
        response = self._forward_request(
×
117
            context=context, service_request=request, region=global_table_region
118
        )
119

120
        # In case of a replica table, we need to keep track of the real region originating the shard iterator.
121
        # This region will be later used in GetRecords to redirect to the originating region, holding the data.
122
        if global_table_region != context.region and (
×
123
            shard_iterator := response.get("ShardIterator")
124
        ):
125
            self.shard_to_region[shard_iterator] = global_table_region
×
126
        return response
×
127

128
    @handler("ListStreams", expand=False)
1✔
129
    def list_streams(self, context: RequestContext, payload: ListStreamsInput) -> ListStreamsOutput:
1✔
130
        global_table_region = get_original_region(
×
131
            context=context, stream_arn=payload.get("TableName")
132
        )
133
        # TODO: look into `ExclusiveStartStreamArn` param
134
        return self._forward_request(
×
135
            context=context, service_request=payload, region=global_table_region
136
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc