• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 96c05c56-d484-41e1-aa95-8bc070bb9477

07 Apr 2025 07:41AM UTC coverage: 86.842% (+0.03%) from 86.81%
96c05c56-d484-41e1-aa95-8bc070bb9477

push

circleci

web-flow
Update ASF APIs, update events provider signature (#12490)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>
Co-authored-by: Alexander Rashed <alexander.rashed@localstack.cloud>

63569 of 73201 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.65
/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/kinesis_poller.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
from copy import deepcopy
1✔
5
from datetime import datetime
1✔
6

7
from botocore.client import BaseClient
1✔
8

9
from localstack.aws.api.kinesis import StreamStatus
1✔
10
from localstack.aws.api.pipes import (
1✔
11
    KinesisStreamStartPosition,
12
)
13
from localstack.services.lambda_.event_source_mapping.event_processor import (
1✔
14
    EventProcessor,
15
)
16
from localstack.services.lambda_.event_source_mapping.pollers.stream_poller import StreamPoller
1✔
17
from localstack.utils.strings import to_str
1✔
18

19
LOG = logging.getLogger(__name__)
1✔
20

21

22
class KinesisPoller(StreamPoller):
1✔
23
    # The role ARN of the processor (e.g., role ARN of the Pipe)
24
    invoke_identity_arn: str | None
1✔
25
    # Flag to enable nested kinesis namespace when formatting events to support the nested `kinesis` field structure
26
    # used for Lambda ESM: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-event-example
27
    # EventBridge Pipes uses no nesting: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html
28
    kinesis_namespace: bool
1✔
29

30
    def __init__(
1✔
31
        self,
32
        source_arn: str,
33
        source_parameters: dict | None = None,
34
        source_client: BaseClient | None = None,
35
        processor: EventProcessor | None = None,
36
        partner_resource_arn: str | None = None,
37
        invoke_identity_arn: str | None = None,
38
        kinesis_namespace: bool = False,
39
        esm_uuid: str | None = None,
40
    ):
41
        super().__init__(
1✔
42
            source_arn,
43
            source_parameters,
44
            source_client,
45
            processor,
46
            esm_uuid=esm_uuid,
47
            partner_resource_arn=partner_resource_arn,
48
        )
49
        self.invoke_identity_arn = invoke_identity_arn
1✔
50
        self.kinesis_namespace = kinesis_namespace
1✔
51

52
    @property
1✔
53
    def stream_parameters(self) -> dict:
1✔
54
        return self.source_parameters["KinesisStreamParameters"]
1✔
55

56
    def initialize_shards(self) -> dict[str, str]:
1✔
57
        # TODO: cache this and update/re-try upon failures
58
        stream_info = self.source_client.describe_stream(StreamARN=self.source_arn)
1✔
59
        stream_status = stream_info["StreamDescription"]["StreamStatus"]
1✔
60
        if stream_status != StreamStatus.ACTIVE:
1✔
61
            LOG.warning(
×
62
                "Stream %s is not active. Current status: %s",
63
                self.source_arn,
64
                stream_status,
65
            )
66
            return {}
×
67

68
        # NOTICE: re-sharding might require updating this periodically (unknown how Pipes does it!?)
69
        # Mapping of shard id => shard iterator
70
        shards = {}
1✔
71
        for shard in stream_info["StreamDescription"]["Shards"]:
1✔
72
            shard_id = shard["ShardId"]
1✔
73
            starting_position = self.stream_parameters["StartingPosition"]
1✔
74
            kwargs = {}
1✔
75
            # TODO: test StartingPosition=AT_TIMESTAMP (only supported for Kinesis!)
76
            if starting_position == KinesisStreamStartPosition.AT_TIMESTAMP:
1✔
77
                kwargs["StartingSequenceNumber"] = self.stream_parameters[
×
78
                    "StartingPositionTimestamp"
79
                ]
80
            get_shard_iterator_response = self.source_client.get_shard_iterator(
1✔
81
                StreamARN=self.source_arn,
82
                ShardId=shard_id,
83
                ShardIteratorType=starting_position,
84
                **kwargs,
85
            )
86
            shards[shard_id] = get_shard_iterator_response["ShardIterator"]
1✔
87

88
        LOG.debug("Event source %s has %d shards.", self.source_arn, len(self.shards))
1✔
89
        return shards
1✔
90

91
    def stream_arn_param(self) -> dict:
1✔
92
        return {"StreamARN": self.source_arn}
1✔
93

94
    def event_source(self) -> str:
1✔
95
        return "aws:kinesis"
1✔
96

97
    def extra_metadata(self) -> dict:
1✔
98
        return {
1✔
99
            "eventVersion": "1.0",
100
            "eventName": "aws:kinesis:record",
101
            "invokeIdentityArn": self.invoke_identity_arn,
102
        }
103

104
    def transform_into_events(self, records: list[dict], shard_id) -> list[dict]:
1✔
105
        events = []
1✔
106
        for record in records:
1✔
107
            # TODO: consolidate with Kinesis event source listener:
108
            #  localstack.services.lambda_.event_source_listeners.kinesis_event_source_listener.KinesisEventSourceListener._create_lambda_event_payload
109
            #  check `encryptionType` leading to serialization errors by Dotnet Lambdas
110
            sequence_number = record["SequenceNumber"]
1✔
111
            event = {
1✔
112
                # TODO: add this metadata after filtering.
113
                #  This requires some design adjustment because the sequence number depends on the record.
114
                "eventID": f"{shard_id}:{sequence_number}",
115
            }
116
            kinesis_fields = {
1✔
117
                "kinesisSchemaVersion": "1.0",
118
                "partitionKey": record["PartitionKey"],
119
                "sequenceNumber": sequence_number,
120
                # TODO: implement heuristic based on content type: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html#pipes-filter-sqs
121
                # boto3 automatically decodes records in get_records(), so we must re-encode
122
                "data": to_str(base64.b64encode(record["Data"])),
123
                "approximateArrivalTimestamp": record["ApproximateArrivalTimestamp"].timestamp(),
124
            }
125
            if self.kinesis_namespace:
1✔
126
                event["kinesis"] = kinesis_fields
1✔
127
            else:
128
                event.update(kinesis_fields)
×
129
            events.append(event)
1✔
130
        return events
1✔
131

132
    def failure_payload_details_field_name(self) -> str:
1✔
133
        return "KinesisBatchInfo"
1✔
134

135
    def get_approximate_arrival_time(self, record: dict) -> float:
1✔
136
        if self.kinesis_namespace:
1✔
137
            return record["kinesis"]["approximateArrivalTimestamp"]
1✔
138
        else:
139
            return record["approximateArrivalTimestamp"]
×
140

141
    def format_datetime(self, time: datetime) -> str:
1✔
142
        return f"{time.isoformat(timespec='milliseconds')}Z"
1✔
143

144
    def get_sequence_number(self, record: dict) -> str:
1✔
145
        if self.kinesis_namespace:
1✔
146
            return record["kinesis"]["sequenceNumber"]
1✔
147
        else:
148
            return record["sequenceNumber"]
×
149

150
    def pre_filter(self, events: list[dict]) -> list[dict]:
1✔
151
        # TODO: test what happens with a mixture of data and non-data filters?
152
        if has_data_filter_criteria_parsed(self.filter_patterns):
1✔
153
            parsed_events = []
1✔
154
            for event in events:
1✔
155
                raw_data = self.get_data(event)
1✔
156
                try:
1✔
157
                    data = self.parse_data(raw_data)
1✔
158
                    # TODO: test "data" key remapping
159
                    # Filtering remaps "kinesis.data" in ESM to "data (idempotent for Pipes using "data" directly)
160
                    # ESM: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-filtering.html
161
                    # Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html
162
                    # Pipes filtering: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html
163
                    parsed_event = deepcopy(event)
1✔
164
                    parsed_event["data"] = data
1✔
165

166
                    parsed_events.append(parsed_event)
1✔
167
                except json.JSONDecodeError:
×
168
                    LOG.warning(
×
169
                        "Unable to convert event data '%s' to json... Record will be dropped.",
170
                        raw_data,
171
                        exc_info=LOG.isEnabledFor(logging.DEBUG),
172
                    )
173
            return parsed_events
1✔
174
        else:
175
            return events
1✔
176

177
    def post_filter(self, events: list[dict]) -> list[dict]:
1✔
178
        if has_data_filter_criteria_parsed(self.filter_patterns):
1✔
179
            # convert them back (HACK for fixing parity with v1 and getting regression tests passing)
180
            for event in events:
1✔
181
                parsed_data = event.pop("data")
1✔
182
                encoded_data = self.encode_data(parsed_data)
1✔
183
                self.set_data(event, encoded_data)
1✔
184
        return events
1✔
185

186
    def get_data(self, event: dict) -> str:
1✔
187
        if self.kinesis_namespace:
1✔
188
            return event["kinesis"]["data"]
1✔
189
        else:
190
            return event["data"]
×
191

192
    def set_data(self, event: dict, data: bytes) -> None:
1✔
193
        if self.kinesis_namespace:
1✔
194
            event["kinesis"]["data"] = data
1✔
195
        else:
196
            event["data"] = data
×
197

198
    def parse_data(self, raw_data: str) -> dict | str:
1✔
199
        decoded_data = base64.b64decode(raw_data)
1✔
200
        return json.loads(decoded_data)
1✔
201

202
    def encode_data(self, parsed_data: dict) -> str:
1✔
203
        return base64.b64encode(json.dumps(parsed_data).encode()).decode()
1✔
204

205

206
def has_data_filter_criteria_parsed(parsed_filters: list[dict]) -> bool:
1✔
207
    for filter in parsed_filters:
1✔
208
        if "data" in filter:
1✔
209
            return True
1✔
210
    return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc