• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.94
/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py
1
import json
1✔
2
import logging
1✔
3
import threading
1✔
4
from abc import abstractmethod
1✔
5
from bisect import bisect_left
1✔
6
from collections import defaultdict
1✔
7
from collections.abc import Iterator
1✔
8
from datetime import datetime
1✔
9

10
from botocore.client import BaseClient
1✔
11
from botocore.exceptions import ClientError
1✔
12

13
from localstack.aws.api.pipes import (
1✔
14
    OnPartialBatchItemFailureStreams,
15
)
16
from localstack.services.lambda_.event_source_mapping.event_processor import (
1✔
17
    BatchFailureError,
18
    CustomerInvocationError,
19
    EventProcessor,
20
    PartialBatchFailureError,
21
    PipeInternalError,
22
)
23
from localstack.services.lambda_.event_source_mapping.pipe_utils import (
1✔
24
    get_current_time,
25
    get_datetime_from_timestamp,
26
    get_internal_client,
27
)
28
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
1✔
29
    EmptyPollResultsException,
30
    Poller,
31
    get_batch_item_failures,
32
)
33
from localstack.services.lambda_.event_source_mapping.pollers.sqs_poller import get_queue_url
1✔
34
from localstack.services.lambda_.event_source_mapping.senders.sender_utils import (
1✔
35
    batched,
36
)
37
from localstack.utils.aws.arns import parse_arn, s3_bucket_name
1✔
38
from localstack.utils.backoff import ExponentialBackoff
1✔
39
from localstack.utils.batching import Batcher
1✔
40
from localstack.utils.strings import long_uid
1✔
41

42
LOG = logging.getLogger(__name__)
1✔
43

44

45
# TODO: fix this poller to support resharding
46
#   https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html
47
class StreamPoller(Poller):
1✔
48
    # Mapping of shard id => shard iterator
49
    # TODO: This mapping approach needs to be re-worked to instead store last processed sequence number.
50
    shards: dict[str, str]
1✔
51
    # Iterator for round-robin polling from different shards because a batch cannot contain events from different shards
52
    # This is a workaround for not handling shards in parallel.
53
    iterator_over_shards: Iterator[tuple[str, str]] | None
1✔
54
    # ESM UUID is needed in failure processing to form s3 failure destination object key
55
    esm_uuid: str | None
1✔
56

57
    # The ARN of the processor (e.g., Pipe ARN)
58
    partner_resource_arn: str | None
1✔
59

60
    # Used for backing-off between retries and breaking the retry loop
61
    _is_shutdown: threading.Event
1✔
62

63
    # Collects and flushes a batch of records based on a batching policy
64
    shard_batcher: dict[str, Batcher[dict]]
1✔
65

66
    def __init__(
1✔
67
        self,
68
        source_arn: str,
69
        source_parameters: dict | None = None,
70
        source_client: BaseClient | None = None,
71
        processor: EventProcessor | None = None,
72
        partner_resource_arn: str | None = None,
73
        esm_uuid: str | None = None,
74
        shards: dict[str, str] | None = None,
75
    ):
76
        super().__init__(source_arn, source_parameters, source_client, processor)
1✔
77
        self.partner_resource_arn = partner_resource_arn
1✔
78
        self.esm_uuid = esm_uuid
1✔
79
        self.shards = shards if shards is not None else {}
1✔
80
        self.iterator_over_shards = None
1✔
81

82
        self._is_shutdown = threading.Event()
1✔
83

84
        self.shard_batcher = defaultdict(
1✔
85
            lambda: Batcher(
86
                max_count=self.stream_parameters.get("BatchSize", 100),
87
                max_window=self.stream_parameters.get("MaximumBatchingWindowInSeconds", 0),
88
            )
89
        )
90

91
    @abstractmethod
1✔
92
    def transform_into_events(self, records: list[dict], shard_id) -> list[dict]:
1✔
93
        pass
×
94

95
    @property
1✔
96
    @abstractmethod
1✔
97
    def stream_parameters(self) -> dict:
1✔
98
        pass
×
99

100
    @abstractmethod
1✔
101
    def initialize_shards(self) -> dict[str, str]:
1✔
102
        """Returns a shard dict mapping from shard id -> shard iterator
103
        The implementations for Kinesis and DynamoDB are similar but differ in various ways:
104
        * Kinesis uses "StreamARN" and DynamoDB uses "StreamArn" as source parameter
105
        * Kinesis uses "StreamStatus.ACTIVE" and DynamoDB uses "StreamStatus.ENABLED"
106
        * Only Kinesis supports the additional StartingPosition called "AT_TIMESTAMP" using "StartingPositionTimestamp"
107
        """
108
        pass
×
109

110
    @abstractmethod
1✔
111
    def stream_arn_param(self) -> dict:
1✔
112
        """Returns a dict of the correct key/value pair for the stream arn used in GetRecords.
113
        Either StreamARN for Kinesis or {} for DynamoDB (unsupported)"""
114
        pass
×
115

116
    @abstractmethod
1✔
117
    def failure_payload_details_field_name(self) -> str:
1✔
118
        pass
×
119

120
    @abstractmethod
1✔
121
    def get_approximate_arrival_time(self, record: dict) -> float:
1✔
122
        pass
×
123

124
    @abstractmethod
1✔
125
    def format_datetime(self, time: datetime) -> str:
1✔
126
        """Formats a datetime in the correct format for DynamoDB (with ms) or Kinesis (without ms)"""
127
        pass
×
128

129
    @abstractmethod
1✔
130
    def get_sequence_number(self, record: dict) -> str:
1✔
131
        pass
×
132

133
    def close(self):
1✔
134
        self._is_shutdown.set()
1✔
135

136
    def pre_filter(self, events: list[dict]) -> list[dict]:
1✔
137
        return events
1✔
138

139
    def post_filter(self, events: list[dict]) -> list[dict]:
1✔
140
        return events
1✔
141

142
    def poll_events(self):
1✔
143
        """Generalized poller for streams such as Kinesis or DynamoDB
144
        Examples of Kinesis consumers:
145
        * StackOverflow: https://stackoverflow.com/a/22403036/6875981
146
        * AWS Sample: https://github.com/aws-samples/kinesis-poster-worker/blob/master/worker.py
147
        Examples of DynamoDB consumers:
148
        * Blogpost: https://www.tecracer.com/blog/2022/05/getting-a-near-real-time-view-of-a-dynamodb-stream-with-python.html
149
        """
150
        # TODO: consider potential shard iterator timeout after 300 seconds (likely not relevant with short-polling):
151
        #   https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#shard-iterator-expires-unexpectedly
152
        #  Does this happen if no records are received for 300 seconds?
153
        if not self.shards:
1✔
154
            self.shards = self.initialize_shards()
1✔
155

156
        if not self.shards:
1✔
157
            LOG.debug("No shards found for %s.", self.source_arn)
1✔
158
            raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn)
1✔
159
        else:
160
            # Remove all shard batchers without corresponding shards
161
            for shard_id in self.shard_batcher.keys() - self.shards.keys():
1✔
162
                self.shard_batcher.pop(shard_id, None)
×
163

164
        # TODO: improve efficiency because this currently limits the throughput to at most batch size per poll interval
165
        # Handle shards round-robin. Re-initialize current shard iterator once all shards are handled.
166
        if self.iterator_over_shards is None:
1✔
167
            self.iterator_over_shards = iter(self.shards.items())
1✔
168

169
        current_shard_tuple = next(self.iterator_over_shards, None)
1✔
170
        if not current_shard_tuple:
1✔
171
            self.iterator_over_shards = iter(self.shards.items())
1✔
172
            current_shard_tuple = next(self.iterator_over_shards, None)
1✔
173

174
        # TODO Better handling when shards are initialised and the iterator returns nothing
175
        if not current_shard_tuple:
1✔
176
            raise PipeInternalError(
×
177
                "Failed to retrieve any shards for stream polling despite initialization."
178
            )
179

180
        try:
1✔
181
            self.poll_events_from_shard(*current_shard_tuple)
1✔
182
        except PipeInternalError:
1✔
183
            # TODO: standardize logging
184
            # Ignore and wait for the next polling interval, which will do retry
185
            pass
1✔
186

187
    def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
1✔
188
        get_records_response = self.get_records(shard_iterator)
1✔
189
        records: list[dict] = get_records_response.get("Records", [])
1✔
190
        if not (next_shard_iterator := get_records_response.get("NextShardIterator")):
1✔
191
            # If the next shard iterator is None, we can assume the shard is closed or
192
            # has expired on the DynamoDB Local server, hence we should re-initialize.
193
            self.shards = self.initialize_shards()
×
194
        else:
195
            # We should always be storing the next_shard_iterator value, otherwise we risk an iterator expiring
196
            # and all records being re-processed.
197
            self.shards[shard_id] = next_shard_iterator
1✔
198

199
        # We cannot reliably back-off when no records found since an iterator
200
        # may have to move multiple times until records are returned.
201
        # See https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty
202
        # However, we still need to check if batcher should be triggered due to time-based batching.
203
        should_flush = self.shard_batcher[shard_id].add(records)
1✔
204
        if not should_flush:
1✔
205
            return
1✔
206

207
        # Retrieve and drain all events in batcher
208
        collected_records = self.shard_batcher[shard_id].flush()
1✔
209
        # If there is overflow (i.e 1k BatchSize and 1.2K returned in flush), further split up the batch.
210
        for batch in batched(collected_records, self.stream_parameters.get("BatchSize")):
1✔
211
            # This could potentially lead to data loss if forward_events_to_target raises an exception after a flush
212
            # which would otherwise be solved with checkpointing.
213
            # TODO: Implement checkpointing, leasing, etc. from https://docs.aws.amazon.com/streams/latest/dev/kcl-concepts.html
214
            self.forward_events_to_target(shard_id, batch)
1✔
215

216
    def forward_events_to_target(self, shard_id, records):
1✔
217
        polled_events = self.transform_into_events(records, shard_id)
1✔
218
        abort_condition = None
1✔
219
        # TODO: implement format detection behavior (e.g., for JSON body):
220
        #  https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html
221
        #  Check whether we need poller-specific filter-preprocessing here without modifying the actual event!
222
        # convert to json for filtering (HACK for fixing parity with v1 and getting regression tests passing)
223
        # localstack.services.lambda_.event_source_listeners.kinesis_event_source_listener.KinesisEventSourceListener._filter_records
224
        # TODO: explore better abstraction for the entire filtering, including the set_data and get_data remapping
225
        #  We need better clarify which transformations happen before and after filtering -> fix missing test coverage
226
        parsed_events = self.pre_filter(polled_events)
1✔
227
        # TODO: advance iterator past matching events!
228
        #  We need to checkpoint the sequence number for each shard and then advance the shard iterator using
229
        #  GetShardIterator with a given sequence number
230
        #  https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
231
        #  Failing to do so kinda blocks the stream resulting in very high latency.
232
        matching_events = self.filter_events(parsed_events)
1✔
233
        matching_events_post_filter = self.post_filter(matching_events)
1✔
234

235
        # TODO: implement MaximumBatchingWindowInSeconds flush condition (before or after filter?)
236
        # Don't trigger upon empty events
237
        if len(matching_events_post_filter) == 0:
1✔
238
            return
1✔
239

240
        events = self.add_source_metadata(matching_events_post_filter)
1✔
241
        LOG.debug("Polled %d events from %s in shard %s", len(events), self.source_arn, shard_id)
1✔
242
        #  -> This could be tested by setting a high retry number, using a long pipe execution, and a relatively
243
        #  short record expiration age at the source. Check what happens if the record expires at the source.
244
        #  A potential implementation could use checkpointing based on the iterator position (within shard scope)
245
        # TODO: handle partial batch failure (see poller.py:parse_batch_item_failures)
246
        # TODO: think about how to avoid starvation of other shards if one shard runs into infinite retries
247
        attempts = 0
1✔
248
        discarded_events_for_dlq = []
1✔
249
        error_payload = {}
1✔
250

251
        max_retries = self.stream_parameters.get("MaximumRetryAttempts", -1)
1✔
252
        max_record_age = max(
1✔
253
            self.stream_parameters.get("MaximumRecordAgeInSeconds", -1), 0
254
        )  # Disable check if -1
255
        # NOTE: max_retries == 0 means exponential backoff is disabled
256
        boff = ExponentialBackoff(max_retries=max_retries)
1✔
257
        while not abort_condition and events and not self._is_shutdown.is_set():
1✔
258
            if self.max_retries_exceeded(attempts):
1✔
259
                abort_condition = "RetryAttemptsExhausted"
1✔
260
                break
1✔
261

262
            if max_record_age:
1✔
263
                events, expired_events = self.bisect_events_by_record_age(max_record_age, events)
1✔
264
                if expired_events:
1✔
265
                    discarded_events_for_dlq.extend(expired_events)
1✔
266
                    continue
1✔
267

268
            try:
1✔
269
                if attempts > 0:
1✔
270
                    # TODO: Should we always backoff (with jitter) before processing since we may not want multiple pollers
271
                    # all starting up and polling simultaneously
272
                    # For example: 500 persisted ESMs starting up and requesting concurrently could flood gateway
273
                    self._is_shutdown.wait(boff.next_backoff())
1✔
274

275
                self.processor.process_events_batch(events)
1✔
276
                boff.reset()
1✔
277
                # We may need to send on data to a DLQ so break the processing loop and proceed if invocation successful.
278
                break
1✔
279
            except PartialBatchFailureError as ex:
1✔
280
                # TODO: add tests for partial batch failure scenarios
281
                if (
1✔
282
                    self.stream_parameters.get("OnPartialBatchItemFailure")
283
                    == OnPartialBatchItemFailureStreams.AUTOMATIC_BISECT
284
                ):
285
                    # TODO: implement and test splitting batches in half until batch size 1
286
                    #  https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeSourceKinesisStreamParameters.html
287
                    LOG.warning(
×
288
                        "AUTOMATIC_BISECT upon partial batch item failure is not yet implemented. Retrying the entire batch."
289
                    )
290
                error_payload = ex.error
1✔
291

292
                # Extract all sequence numbers from events in batch. This allows us to fail the whole batch if
293
                # an unknown itemidentifier is returned.
294
                batch_sequence_numbers = {
1✔
295
                    self.get_sequence_number(event) for event in matching_events
296
                }
297

298
                # If the batchItemFailures array contains multiple items, Lambda uses the record with the lowest sequence number as the checkpoint.
299
                # Lambda then retries all records starting from that checkpoint.
300
                failed_sequence_ids: list[int] | None = get_batch_item_failures(
1✔
301
                    ex.partial_failure_payload, batch_sequence_numbers
302
                )
303

304
                # If None is returned, consider the entire batch a failure.
305
                if failed_sequence_ids is None:
1✔
306
                    continue
1✔
307

308
                # This shouldn't be possible since a PartialBatchFailureError was raised
309
                if len(failed_sequence_ids) == 0:
1✔
310
                    assert failed_sequence_ids, (
×
311
                        "Invalid state encountered: PartialBatchFailureError raised but no batch item failures found."
312
                    )
313

314
                lowest_sequence_id: str = min(failed_sequence_ids, key=int)
1✔
315

316
                # Discard all successful events and re-process from sequence number of failed event
317
                _, events = self.bisect_events(lowest_sequence_id, events)
1✔
318
            except BatchFailureError as ex:
1✔
319
                error_payload = ex.error
1✔
320

321
                # FIXME partner_resource_arn is not defined in ESM
322
                LOG.debug(
1✔
323
                    "Attempt %d failed while processing %s with events: %s",
324
                    attempts,
325
                    self.partner_resource_arn or self.source_arn,
326
                    events,
327
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
328
                )
329
            except Exception:
×
330
                # FIXME partner_resource_arn is not defined in ESM
331
                LOG.error(
×
332
                    "Attempt %d failed with unexpected error while processing %s with events: %s",
333
                    attempts,
334
                    self.partner_resource_arn or self.source_arn,
335
                    events,
336
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
337
                )
338
            finally:
339
                # Retry polling until the record expires at the source
340
                attempts += 1
1✔
341

342
        if discarded_events_for_dlq:
1✔
343
            abort_condition = "RecordAgeExceeded"
1✔
344
            error_payload = {}
1✔
345
            events = discarded_events_for_dlq
1✔
346

347
        # Send failed events to potential DLQ
348
        if abort_condition:
1✔
349
            failure_context = self.processor.generate_event_failure_context(
1✔
350
                abort_condition=abort_condition,
351
                error=error_payload,
352
                attempts_count=attempts,
353
                partner_resource_arn=self.partner_resource_arn,
354
            )
355
            self.send_events_to_dlq(shard_id, events, context=failure_context)
1✔
356

357
    def get_records(self, shard_iterator: str) -> dict:
1✔
358
        """Returns a GetRecordsOutput from the GetRecords endpoint of streaming services such as Kinesis or DynamoDB"""
359
        try:
1✔
360
            get_records_response = self.source_client.get_records(
1✔
361
                # TODO: add test for cross-account scenario
362
                # Differs for Kinesis and DynamoDB but required for cross-account scenario
363
                **self.stream_arn_param(),
364
                ShardIterator=shard_iterator,
365
                Limit=self.stream_parameters["BatchSize"],
366
            )
367
            return get_records_response
1✔
368
        # TODO: test iterator expired with conditional error scenario (requires failure destinations)
369
        except self.source_client.exceptions.ExpiredIteratorException as e:
1✔
370
            LOG.debug(
×
371
                "Shard iterator %s expired for stream %s, re-initializing shards",
372
                shard_iterator,
373
                self.source_arn,
374
            )
375
            # TODO: test TRIM_HORIZON and AT_TIMESTAMP scenarios for this case. We don't want to start from scratch and
376
            #  might need to think about checkpointing here.
377
            self.shards = self.initialize_shards()
×
378
            raise PipeInternalError from e
×
379
        except ClientError as e:
1✔
380
            if "AccessDeniedException" in str(e):
1✔
381
                LOG.warning(
×
382
                    "Insufficient permissions to get records from stream %s: %s",
383
                    self.source_arn,
384
                    e,
385
                )
386
                raise CustomerInvocationError from e
×
387
            elif "ResourceNotFoundException" in str(e):
1✔
388
                # FIXME: The 'Invalid ShardId in ShardIterator' error is returned by DynamoDB-local. Unsure when/why this is returned.
389
                if "Invalid ShardId in ShardIterator" in str(e):
×
390
                    LOG.warning(
×
391
                        "Invalid ShardId in ShardIterator for %s. Re-initializing shards.",
392
                        self.source_arn,
393
                    )
394
                    self.shards = self.initialize_shards()
×
395
                else:
396
                    LOG.warning(
×
397
                        "Source stream %s does not exist: %s",
398
                        self.source_arn,
399
                        e,
400
                    )
401
                    raise CustomerInvocationError from e
×
402
            elif "TrimmedDataAccessException" in str(e):
1✔
403
                LOG.debug(
×
404
                    "Attempted to iterate over trimmed record or expired shard iterator %s for stream %s, re-initializing shards",
405
                    shard_iterator,
406
                    self.source_arn,
407
                )
408
                self.shards = self.initialize_shards()
×
409
            else:
410
                LOG.debug("ClientError during get_records for stream %s: %s", self.source_arn, e)
1✔
411
            raise PipeInternalError from e
1✔
412

413
    def send_events_to_dlq(self, shard_id, events, context) -> None:
1✔
414
        dlq_arn = self.stream_parameters.get("DeadLetterConfig", {}).get("Arn")
1✔
415
        if dlq_arn:
1✔
416
            failure_timstamp = get_current_time()
1✔
417
            dlq_event = self.create_dlq_event(shard_id, events, context, failure_timstamp)
1✔
418
            # Send DLQ event to DLQ target
419
            parsed_arn = parse_arn(dlq_arn)
1✔
420
            service = parsed_arn["service"]
1✔
421
            # TODO: use a sender instance here, likely inject via DI into poller (what if it updates?)
422
            if service == "sqs":
1✔
423
                # TODO: inject and cache SQS client using proper IAM role (supports cross-account operations)
424
                sqs_client = get_internal_client(dlq_arn)
1✔
425
                # TODO: check if the DLQ exists
426
                dlq_url = get_queue_url(dlq_arn)
1✔
427
                # TODO: validate no FIFO queue because they are unsupported
428
                sqs_client.send_message(QueueUrl=dlq_url, MessageBody=json.dumps(dlq_event))
1✔
429
            elif service == "sns":
1✔
430
                sns_client = get_internal_client(dlq_arn)
1✔
431
                sns_client.publish(TopicArn=dlq_arn, Message=json.dumps(dlq_event))
1✔
432
            elif service == "s3":
1✔
433
                s3_client = get_internal_client(dlq_arn)
1✔
434
                dlq_event_with_payload = {
1✔
435
                    **dlq_event,
436
                    "payload": {
437
                        "Records": events,
438
                    },
439
                }
440
                s3_client.put_object(
1✔
441
                    Bucket=s3_bucket_name(dlq_arn),
442
                    Key=get_failure_s3_object_key(self.esm_uuid, shard_id, failure_timstamp),
443
                    Body=json.dumps(dlq_event_with_payload),
444
                )
445
            else:
446
                LOG.warning("Unsupported DLQ service %s", service)
×
447

448
    def create_dlq_event(
1✔
449
        self, shard_id: str, events: list[dict], context: dict, failure_timestamp: datetime
450
    ) -> dict:
451
        first_record = events[0]
1✔
452
        first_record_arrival = get_datetime_from_timestamp(
1✔
453
            self.get_approximate_arrival_time(first_record)
454
        )
455

456
        last_record = events[-1]
1✔
457
        last_record_arrival = get_datetime_from_timestamp(
1✔
458
            self.get_approximate_arrival_time(last_record)
459
        )
460
        return {
1✔
461
            **context,
462
            self.failure_payload_details_field_name(): {
463
                "approximateArrivalOfFirstRecord": self.format_datetime(first_record_arrival),
464
                "approximateArrivalOfLastRecord": self.format_datetime(last_record_arrival),
465
                "batchSize": len(events),
466
                "endSequenceNumber": self.get_sequence_number(last_record),
467
                "shardId": shard_id,
468
                "startSequenceNumber": self.get_sequence_number(first_record),
469
                "streamArn": self.source_arn,
470
            },
471
            "timestamp": failure_timestamp.isoformat(timespec="milliseconds").replace(
472
                "+00:00", "Z"
473
            ),
474
            "version": "1.0",
475
        }
476

477
    def max_retries_exceeded(self, attempts: int) -> bool:
1✔
478
        maximum_retry_attempts = self.stream_parameters.get("MaximumRetryAttempts", -1)
1✔
479
        # Infinite retries until the source expires
480
        if maximum_retry_attempts == -1:
1✔
481
            return False
1✔
482
        return attempts > maximum_retry_attempts
1✔
483

484
    def bisect_events(
1✔
485
        self, sequence_number: str, events: list[dict]
486
    ) -> tuple[list[dict], list[dict]]:
487
        """Splits list of events in two, where a sequence number equals a passed parameter `sequence_number`.
488
        This is used for:
489
          - `ReportBatchItemFailures`: Discarding events in a batch following a failure when is set.
490
          - `BisectBatchOnFunctionError`: Used to split a failed batch in two when doing a retry (not implemented)."""
491
        for i, event in enumerate(events):
1✔
492
            if self.get_sequence_number(event) == sequence_number:
1✔
493
                return events[:i], events[i:]
1✔
494

495
        return events, []
×
496

497
    def bisect_events_by_record_age(
1✔
498
        self, maximum_record_age: int, events: list[dict]
499
    ) -> tuple[list[dict], list[dict]]:
500
        """Splits events into [valid_events], [expired_events] based on record age.
501
        Where:
502
          - Events with age < maximum_record_age are valid.
503
          - Events with age >= maximum_record_age are expired."""
504
        cutoff_timestamp = get_current_time().timestamp() - maximum_record_age
1✔
505
        index = bisect_left(events, cutoff_timestamp, key=self.get_approximate_arrival_time)
1✔
506
        return events[index:], events[:index]
1✔
507

508

509
def get_failure_s3_object_key(esm_uuid: str, shard_id: str, failure_datetime: datetime) -> str:
1✔
510
    """
511
    From https://docs.aws.amazon.com/lambda/latest/dg/kinesis-on-failure-destination.html:
512

513
    The S3 object containing the invocation record uses the following naming convention:
514
    aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
515

516
    :return: Key for s3 object that invocation failure record will be put to
517
    """
518
    timestamp = failure_datetime.strftime("%Y-%m-%dT%H.%M.%S")
1✔
519
    year_month_day = failure_datetime.strftime("%Y/%m/%d")
1✔
520
    random_uuid = long_uid()
1✔
521
    return f"aws/lambda/{esm_uuid}/{shard_id}/{year_month_day}/{timestamp}-{random_uuid}"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc