• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 7056d8d9-7428-4543-abf2-c5dedb2c98c0

30 May 2025 02:03PM UTC coverage: 86.654% (-0.003%) from 86.657%
7056d8d9-7428-4543-abf2-c5dedb2c98c0

push

circleci

web-flow
Add stack option for CLI start command (#12675)

Co-authored-by: Silvio Vasiljevic <silvio.vasiljevic@gmail.com>
Co-authored-by: Erudit Morina <83708693+eruditmorina@users.noreply.github.com>

2 of 9 new or added lines in 1 file covered. (22.22%)

37 existing lines in 4 files now uncovered.

64641 of 74597 relevant lines covered (86.65%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.65
/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/kinesis_poller.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
from copy import deepcopy
1✔
5
from datetime import datetime
1✔
6

7
from botocore.client import BaseClient
1✔
8

9
from localstack.aws.api.kinesis import StreamStatus
1✔
10
from localstack.aws.api.pipes import (
1✔
11
    KinesisStreamStartPosition,
12
)
13
from localstack.services.lambda_.event_source_mapping.event_processor import (
1✔
14
    EventProcessor,
15
)
16
from localstack.services.lambda_.event_source_mapping.pollers.stream_poller import StreamPoller
1✔
17
from localstack.utils.strings import to_str
1✔
18

19
LOG = logging.getLogger(__name__)
1✔
20

21

22
class KinesisPoller(StreamPoller):
1✔
23
    # The role ARN of the processor (e.g., role ARN of the Pipe)
24
    invoke_identity_arn: str | None
1✔
25
    # Flag to enable nested kinesis namespace when formatting events to support the nested `kinesis` field structure
26
    # used for Lambda ESM: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-event-example
27
    # EventBridge Pipes uses no nesting: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html
28
    kinesis_namespace: bool
1✔
29

30
    def __init__(
1✔
31
        self,
32
        source_arn: str,
33
        source_parameters: dict | None = None,
34
        source_client: BaseClient | None = None,
35
        processor: EventProcessor | None = None,
36
        partner_resource_arn: str | None = None,
37
        invoke_identity_arn: str | None = None,
38
        kinesis_namespace: bool = False,
39
        esm_uuid: str | None = None,
40
        shards: dict[str, str] | None = None,
41
    ):
42
        super().__init__(
1✔
43
            source_arn,
44
            source_parameters,
45
            source_client,
46
            processor,
47
            esm_uuid=esm_uuid,
48
            partner_resource_arn=partner_resource_arn,
49
            shards=shards,
50
        )
51
        self.invoke_identity_arn = invoke_identity_arn
1✔
52
        self.kinesis_namespace = kinesis_namespace
1✔
53

54
    @property
1✔
55
    def stream_parameters(self) -> dict:
1✔
56
        return self.source_parameters["KinesisStreamParameters"]
1✔
57

58
    def initialize_shards(self) -> dict[str, str]:
1✔
59
        # TODO: cache this and update/re-try upon failures
60
        stream_info = self.source_client.describe_stream(StreamARN=self.source_arn)
1✔
61
        stream_status = stream_info["StreamDescription"]["StreamStatus"]
1✔
62
        if stream_status != StreamStatus.ACTIVE:
1✔
UNCOV
63
            LOG.warning(
×
64
                "Stream %s is not active. Current status: %s",
65
                self.source_arn,
66
                stream_status,
67
            )
UNCOV
68
            return {}
×
69

70
        # NOTICE: re-sharding might require updating this periodically (unknown how Pipes does it!?)
71
        # Mapping of shard id => shard iterator
72
        shards = {}
1✔
73
        for shard in stream_info["StreamDescription"]["Shards"]:
1✔
74
            shard_id = shard["ShardId"]
1✔
75
            starting_position = self.stream_parameters["StartingPosition"]
1✔
76
            kwargs = {}
1✔
77
            # TODO: test StartingPosition=AT_TIMESTAMP (only supported for Kinesis!)
78
            if starting_position == KinesisStreamStartPosition.AT_TIMESTAMP:
1✔
UNCOV
79
                kwargs["StartingSequenceNumber"] = self.stream_parameters[
×
80
                    "StartingPositionTimestamp"
81
                ]
82
            get_shard_iterator_response = self.source_client.get_shard_iterator(
1✔
83
                StreamARN=self.source_arn,
84
                ShardId=shard_id,
85
                ShardIteratorType=starting_position,
86
                **kwargs,
87
            )
88
            shards[shard_id] = get_shard_iterator_response["ShardIterator"]
1✔
89

90
        LOG.debug("Event source %s has %d shards.", self.source_arn, len(self.shards))
1✔
91
        return shards
1✔
92

93
    def stream_arn_param(self) -> dict:
1✔
94
        return {"StreamARN": self.source_arn}
1✔
95

96
    def event_source(self) -> str:
1✔
97
        return "aws:kinesis"
1✔
98

99
    def extra_metadata(self) -> dict:
1✔
100
        return {
1✔
101
            "eventVersion": "1.0",
102
            "eventName": "aws:kinesis:record",
103
            "invokeIdentityArn": self.invoke_identity_arn,
104
        }
105

106
    def transform_into_events(self, records: list[dict], shard_id) -> list[dict]:
1✔
107
        events = []
1✔
108
        for record in records:
1✔
109
            # TODO: consolidate with Kinesis event source listener:
110
            #  localstack.services.lambda_.event_source_listeners.kinesis_event_source_listener.KinesisEventSourceListener._create_lambda_event_payload
111
            #  check `encryptionType` leading to serialization errors by Dotnet Lambdas
112
            sequence_number = record["SequenceNumber"]
1✔
113
            event = {
1✔
114
                # TODO: add this metadata after filtering.
115
                #  This requires some design adjustment because the sequence number depends on the record.
116
                "eventID": f"{shard_id}:{sequence_number}",
117
            }
118
            kinesis_fields = {
1✔
119
                "kinesisSchemaVersion": "1.0",
120
                "partitionKey": record["PartitionKey"],
121
                "sequenceNumber": sequence_number,
122
                # TODO: implement heuristic based on content type: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html#pipes-filter-sqs
123
                # boto3 automatically decodes records in get_records(), so we must re-encode
124
                "data": to_str(base64.b64encode(record["Data"])),
125
                "approximateArrivalTimestamp": record["ApproximateArrivalTimestamp"].timestamp(),
126
            }
127
            if self.kinesis_namespace:
1✔
128
                event["kinesis"] = kinesis_fields
1✔
129
            else:
UNCOV
130
                event.update(kinesis_fields)
×
131
            events.append(event)
1✔
132
        return events
1✔
133

134
    def failure_payload_details_field_name(self) -> str:
1✔
135
        return "KinesisBatchInfo"
1✔
136

137
    def get_approximate_arrival_time(self, record: dict) -> float:
1✔
138
        if self.kinesis_namespace:
1✔
139
            return record["kinesis"]["approximateArrivalTimestamp"]
1✔
140
        else:
UNCOV
141
            return record["approximateArrivalTimestamp"]
×
142

143
    def format_datetime(self, time: datetime) -> str:
1✔
144
        return f"{time.isoformat(timespec='milliseconds')}Z"
1✔
145

146
    def get_sequence_number(self, record: dict) -> str:
1✔
147
        if self.kinesis_namespace:
1✔
148
            return record["kinesis"]["sequenceNumber"]
1✔
149
        else:
UNCOV
150
            return record["sequenceNumber"]
×
151

152
    def pre_filter(self, events: list[dict]) -> list[dict]:
1✔
153
        # TODO: test what happens with a mixture of data and non-data filters?
154
        if has_data_filter_criteria_parsed(self.filter_patterns):
1✔
155
            parsed_events = []
1✔
156
            for event in events:
1✔
157
                raw_data = self.get_data(event)
1✔
158
                try:
1✔
159
                    data = self.parse_data(raw_data)
1✔
160
                    # TODO: test "data" key remapping
161
                    # Filtering remaps "kinesis.data" in ESM to "data (idempotent for Pipes using "data" directly)
162
                    # ESM: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-filtering.html
163
                    # Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html
164
                    # Pipes filtering: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html
165
                    parsed_event = deepcopy(event)
1✔
166
                    parsed_event["data"] = data
1✔
167

168
                    parsed_events.append(parsed_event)
1✔
UNCOV
169
                except json.JSONDecodeError:
×
UNCOV
170
                    LOG.warning(
×
171
                        "Unable to convert event data '%s' to json... Record will be dropped.",
172
                        raw_data,
173
                        exc_info=LOG.isEnabledFor(logging.DEBUG),
174
                    )
175
            return parsed_events
1✔
176
        else:
177
            return events
1✔
178

179
    def post_filter(self, events: list[dict]) -> list[dict]:
1✔
180
        if has_data_filter_criteria_parsed(self.filter_patterns):
1✔
181
            # convert them back (HACK for fixing parity with v1 and getting regression tests passing)
182
            for event in events:
1✔
183
                parsed_data = event.pop("data")
1✔
184
                encoded_data = self.encode_data(parsed_data)
1✔
185
                self.set_data(event, encoded_data)
1✔
186
        return events
1✔
187

188
    def get_data(self, event: dict) -> str:
1✔
189
        if self.kinesis_namespace:
1✔
190
            return event["kinesis"]["data"]
1✔
191
        else:
UNCOV
192
            return event["data"]
×
193

194
    def set_data(self, event: dict, data: bytes) -> None:
1✔
195
        if self.kinesis_namespace:
1✔
196
            event["kinesis"]["data"] = data
1✔
197
        else:
UNCOV
198
            event["data"] = data
×
199

200
    def parse_data(self, raw_data: str) -> dict | str:
1✔
201
        decoded_data = base64.b64decode(raw_data)
1✔
202
        return json.loads(decoded_data)
1✔
203

204
    def encode_data(self, parsed_data: dict) -> str:
1✔
205
        return base64.b64encode(json.dumps(parsed_data).encode()).decode()
1✔
206

207

208
def has_data_filter_criteria_parsed(parsed_filters: list[dict]) -> bool:
1✔
209
    for filter in parsed_filters:
1✔
210
        if "data" in filter:
1✔
211
            return True
1✔
212
    return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc