• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20476669022

23 Dec 2025 06:23PM UTC coverage: 86.921% (+0.009%) from 86.912%
20476669022

push

github

web-flow
Fix KMS model annotations (#13563)

2 of 2 new or added lines in 1 file covered. (100.0%)

266 existing lines in 7 files now uncovered.

70055 of 80596 relevant lines covered (86.92%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.78
/localstack-core/localstack/services/kinesis/provider.py
1
import json
1✔
2
import logging
1✔
3
import os
1✔
4
import re
1✔
5
import time
1✔
6
from random import random
1✔
7

8
from localstack import config
1✔
9
from localstack.aws.api import RequestContext
1✔
10
from localstack.aws.api.kinesis import (
1✔
11
    ConsumerARN,
12
    Data,
13
    GetResourcePolicyOutput,
14
    HashKey,
15
    KinesisApi,
16
    PartitionKey,
17
    Policy,
18
    ProvisionedThroughputExceededException,
19
    PutRecordOutput,
20
    PutRecordsOutput,
21
    PutRecordsRequestEntryList,
22
    PutRecordsResultEntry,
23
    ResourceARN,
24
    ResourceNotFoundException,
25
    SequenceNumber,
26
    ShardId,
27
    StartingPosition,
28
    StreamARN,
29
    StreamName,
30
    SubscribeToShardEvent,
31
    SubscribeToShardEventStream,
32
    SubscribeToShardOutput,
33
    ValidationException,
34
)
35
from localstack.aws.connect import connect_to
1✔
36
from localstack.constants import LOCALHOST
1✔
37
from localstack.services.kinesis.kinesis_mock_server import KinesisServerManager
1✔
38
from localstack.services.kinesis.models import KinesisStore, kinesis_stores
1✔
39
from localstack.services.plugins import ServiceLifecycleHook
1✔
40
from localstack.state import AssetDirectory, StateVisitor
1✔
41
from localstack.utils.aws import arns
1✔
42
from localstack.utils.aws.arns import extract_account_id_from_arn, extract_region_from_arn
1✔
43
from localstack.utils.time import now_utc
1✔
44

45
LOG = logging.getLogger(__name__)
1✔
46
MAX_SUBSCRIPTION_SECONDS = 300
1✔
47
SERVER_STARTUP_TIMEOUT = 120
1✔
48

49
DATA_STREAM_ARN_REGEX = re.compile(
1✔
50
    r"^arn:aws(?:-[a-z]+)*:kinesis:[a-z0-9-]+:\d{12}:stream\/[a-zA-Z0-9_.\-]+$"
51
)
52
CONSUMER_ARN_REGEX = re.compile(
1✔
53
    r"^arn:aws(?:-[a-z]+)*:kinesis:[a-z0-9-]+:\d{12}:stream\/[a-zA-Z0-9_.\-]+\/consumer\/[a-zA-Z0-9_.\-]+:\d+$"
54
)
55

56

57
def find_stream_for_consumer(consumer_arn):
1✔
58
    account_id = extract_account_id_from_arn(consumer_arn)
1✔
59
    region_name = extract_region_from_arn(consumer_arn)
1✔
60
    kinesis = connect_to(aws_access_key_id=account_id, region_name=region_name).kinesis
1✔
61
    for stream_name in kinesis.list_streams()["StreamNames"]:
1✔
62
        stream_arn = arns.kinesis_stream_arn(stream_name, account_id, region_name)
1✔
63
        for cons in kinesis.list_stream_consumers(StreamARN=stream_arn)["Consumers"]:
1✔
64
            if cons["ConsumerARN"] == consumer_arn:
1✔
65
                return stream_name
1✔
66
    raise Exception(f"Unable to find stream for stream consumer {consumer_arn}")
×
67

68

69
def is_valid_kinesis_arn(resource_arn: ResourceARN) -> bool:
1✔
70
    """Check if the provided ARN is a valid Kinesis ARN."""
71
    return bool(CONSUMER_ARN_REGEX.match(resource_arn) or DATA_STREAM_ARN_REGEX.match(resource_arn))
1✔
72

73

74
class KinesisProvider(KinesisApi, ServiceLifecycleHook):
1✔
75
    server_manager: KinesisServerManager
1✔
76

77
    def __init__(self):
1✔
78
        self.server_manager = KinesisServerManager()
1✔
79

80
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
81
        visitor.visit(kinesis_stores)
×
82
        visitor.visit(AssetDirectory(self.service, os.path.join(config.dirs.data, "kinesis")))
×
83

84
    def on_before_state_load(self):
1✔
85
        # no need to restart servers, since that happens lazily in `server_manager.get_server_for_account`.
86
        self.server_manager.shutdown_all()
×
87

88
    def on_before_state_reset(self):
1✔
89
        self.server_manager.shutdown_all()
×
90

91
    def on_before_stop(self):
1✔
92
        self.server_manager.shutdown_all()
1✔
93

94
    def get_forward_url(self, account_id: str, region_name: str) -> str:
1✔
95
        """Return the URL of the backend Kinesis server to forward requests to"""
96
        server = self.server_manager.get_server_for_account(account_id)
1✔
97
        return f"http://{LOCALHOST}:{server.port}"
1✔
98

99
    @staticmethod
1✔
100
    def get_store(account_id: str, region_name: str) -> KinesisStore:
1✔
101
        return kinesis_stores[account_id][region_name]
1✔
102

103
    def put_resource_policy(
1✔
104
        self,
105
        context: RequestContext,
106
        resource_arn: ResourceARN,
107
        policy: Policy,
108
        **kwargs,
109
    ) -> None:
110
        if not is_valid_kinesis_arn(resource_arn):
1✔
111
            raise ValidationException(f"invalid kinesis arn {resource_arn}")
×
112

113
        kinesis = connect_to(
1✔
114
            aws_access_key_id=context.account_id, region_name=context.region
115
        ).kinesis
116
        try:
1✔
117
            kinesis.describe_stream_summary(StreamARN=resource_arn)
1✔
118
        except kinesis.exceptions.ResourceNotFoundException:
1✔
119
            raise ResourceNotFoundException(f"Stream with ARN {resource_arn} not found")
1✔
120

121
        store = self.get_store(context.account_id, context.region)
1✔
122
        store.resource_policies[resource_arn] = policy
1✔
123

124
    def get_resource_policy(
1✔
125
        self,
126
        context: RequestContext,
127
        resource_arn: ResourceARN,
128
        **kwargs,
129
    ) -> GetResourcePolicyOutput:
130
        if not is_valid_kinesis_arn(resource_arn):
1✔
131
            raise ValidationException(f"invalid kinesis arn {resource_arn}")
×
132

133
        kinesis = connect_to(
1✔
134
            aws_access_key_id=context.account_id, region_name=context.region
135
        ).kinesis
136
        try:
1✔
137
            kinesis.describe_stream_summary(StreamARN=resource_arn)
1✔
138
        except kinesis.exceptions.ResourceNotFoundException:
×
139
            raise ResourceNotFoundException(f"Stream with ARN {resource_arn} not found")
×
140

141
        store = self.get_store(context.account_id, context.region)
1✔
142
        policy = store.resource_policies.get(resource_arn, json.dumps({}))
1✔
143
        return GetResourcePolicyOutput(Policy=policy)
1✔
144

145
    def delete_resource_policy(
1✔
146
        self,
147
        context: RequestContext,
148
        resource_arn: ResourceARN,
149
        **kwargs,
150
    ) -> None:
151
        if not is_valid_kinesis_arn(resource_arn):
1✔
152
            raise ValidationException(f"invalid kinesis arn {resource_arn}")
×
153

154
        store = self.get_store(context.account_id, context.region)
1✔
155
        if resource_arn not in store.resource_policies:
1✔
156
            raise ResourceNotFoundException(
1✔
157
                f"No resource policy found for resource ARN {resource_arn}"
158
            )
159
        del store.resource_policies[resource_arn]
1✔
160

161
    def subscribe_to_shard(
1✔
162
        self,
163
        context: RequestContext,
164
        consumer_arn: ConsumerARN,
165
        shard_id: ShardId,
166
        starting_position: StartingPosition,
167
        **kwargs,
168
    ) -> SubscribeToShardOutput:
169
        kinesis = connect_to(
1✔
170
            aws_access_key_id=context.account_id, region_name=context.region
171
        ).kinesis
172
        stream_name = find_stream_for_consumer(consumer_arn)
1✔
173
        iter_type = starting_position["Type"]
1✔
174
        kwargs = {}
1✔
175
        starting_sequence_number = starting_position.get("SequenceNumber") or "0"
1✔
176
        if iter_type in ["AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER"]:
1✔
177
            kwargs["StartingSequenceNumber"] = starting_sequence_number
1✔
178
        elif iter_type in ["AT_TIMESTAMP"]:
1✔
179
            # or value is just an example timestamp from aws docs
180
            timestamp = starting_position.get("Timestamp") or 1459799926.480
1✔
181
            kwargs["Timestamp"] = timestamp
1✔
182
        initial_shard_iterator = kinesis.get_shard_iterator(
1✔
183
            StreamName=stream_name, ShardId=shard_id, ShardIteratorType=iter_type, **kwargs
184
        )["ShardIterator"]
185

186
        def event_generator():
1✔
187
            shard_iterator = initial_shard_iterator
1✔
188
            last_sequence_number = starting_sequence_number
1✔
189

190
            maximum_duration_subscription_timestamp = now_utc() + MAX_SUBSCRIPTION_SECONDS
1✔
191

192
            while now_utc() < maximum_duration_subscription_timestamp:
1✔
193
                try:
1✔
194
                    result = kinesis.get_records(ShardIterator=shard_iterator)
1✔
UNCOV
195
                except Exception as e:
×
UNCOV
196
                    if "ResourceNotFoundException" in str(e):
×
UNCOV
197
                        LOG.debug(
×
198
                            'Kinesis stream "%s" has been deleted, closing shard subscriber',
199
                            stream_name,
200
                        )
UNCOV
201
                        return
×
202
                    raise
×
203
                shard_iterator = result.get("NextShardIterator")
1✔
204
                records = result.get("Records", [])
1✔
205
                if records:
1✔
206
                    # Update the last sequence number to the last record's sequence number
207
                    # TODO: This will suffice for now but does not properly capture checkpointing when
208
                    # no data is written to a shard. See AWS docs:
209
                    # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html#API_SubscribeToShardEvent_Contents
210
                    last_sequence_number = records[-1].get("SequenceNumber", last_sequence_number)
1✔
211
                else:
212
                    # On AWS there is *at least* 1 event every 5 seconds
213
                    # but this is not possible in this structure.
214
                    # In order to avoid a 5-second blocking call, we make the compromise of 3 seconds.
215
                    time.sleep(3)
1✔
216

217
                yield SubscribeToShardEventStream(
1✔
218
                    SubscribeToShardEvent=SubscribeToShardEvent(
219
                        Records=records,
220
                        ContinuationSequenceNumber=str(last_sequence_number),
221
                        MillisBehindLatest=0,
222
                        ChildShards=None,  # TODO: Include shard children info
223
                    )
224
                )
225

226
        return SubscribeToShardOutput(EventStream=event_generator())
1✔
227

228
    def put_record(
1✔
229
        self,
230
        context: RequestContext,
231
        data: Data,
232
        partition_key: PartitionKey,
233
        stream_name: StreamName = None,
234
        explicit_hash_key: HashKey = None,
235
        sequence_number_for_ordering: SequenceNumber = None,
236
        stream_arn: StreamARN = None,
237
        **kwargs,
238
    ) -> PutRecordOutput:
239
        # TODO: Ensure use of `stream_arn` works. Currently kinesis-mock only works with ctx request account ID and region
240
        if random() < config.KINESIS_ERROR_PROBABILITY:
1✔
241
            raise ProvisionedThroughputExceededException(
×
242
                "Rate exceeded for shard X in stream Y under account Z."
243
            )
244
        # If "we were lucky" and the error probability didn't hit, we raise a NotImplementedError in order to
245
        # trigger the fallback to kinesis-mock
246
        raise NotImplementedError
247

248
    def put_records(
1✔
249
        self,
250
        context: RequestContext,
251
        records: PutRecordsRequestEntryList,
252
        stream_name: StreamName = None,
253
        stream_arn: StreamARN = None,
254
        **kwargs,
255
    ) -> PutRecordsOutput:
256
        # TODO: Ensure use of `stream_arn` works. Currently kinesis-mock only works with ctx request account ID and region
257
        if random() < config.KINESIS_ERROR_PROBABILITY:
1✔
258
            records_count = len(records) if records is not None else 0
1✔
259
            records = [
1✔
260
                PutRecordsResultEntry(
261
                    ErrorCode="ProvisionedThroughputExceededException",
262
                    ErrorMessage="Rate exceeded for shard X in stream Y under account Z.",
263
                )
264
            ] * records_count
265
            return PutRecordsOutput(FailedRecordCount=1, Records=records)
1✔
266
        # If "we were lucky" and the error probability didn't hit, we raise a NotImplementedError in order to
267
        # trigger the fallback to kinesis-mock
268
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc