• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 858585c6-6e27-4bca-a3b8-78d6d978f737

25 Mar 2025 06:43PM UTC coverage: 86.88% (+0.02%) from 86.865%
858585c6-6e27-4bca-a3b8-78d6d978f737

push

circleci

web-flow
[Utils] Add a batch policy utility (#12430)

53 of 56 new or added lines in 1 file covered. (94.64%)

227 existing lines in 12 files now uncovered.

63251 of 72803 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.04
/localstack-core/localstack/services/lambda_/event_source_mapping/esm_event_processor.py
1
import json
1✔
2
import logging
1✔
3
import uuid
1✔
4

5
from localstack.aws.api.pipes import LogLevel
1✔
6
from localstack.services.lambda_.analytics import EsmExecutionStatus, esm_counter
1✔
7
from localstack.services.lambda_.event_source_mapping.event_processor import (
1✔
8
    BatchFailureError,
9
    EventProcessor,
10
    PartialBatchFailureError,
11
)
12
from localstack.services.lambda_.event_source_mapping.pipe_loggers.pipe_logger import PipeLogger
1✔
13
from localstack.services.lambda_.event_source_mapping.pipe_utils import to_json_str
1✔
14
from localstack.services.lambda_.event_source_mapping.senders.sender import (
1✔
15
    PartialFailureSenderError,
16
    Sender,
17
    SenderError,
18
)
19

20
LOG = logging.getLogger(__name__)
1✔
21

22

23
class EsmEventProcessor(EventProcessor):
1✔
24
    sender: Sender
1✔
25
    logger: PipeLogger
1✔
26

27
    def __init__(self, sender, logger):
1✔
28
        self.sender = sender
1✔
29
        self.logger = logger
1✔
30

31
    def process_events_batch(self, input_events: list[dict] | dict) -> None:
1✔
32
        # analytics
33
        if isinstance(input_events, list) and input_events:
1✔
34
            first_event = input_events[0]
1✔
35
        elif input_events:
×
36
            first_event = input_events
×
37
        else:
38
            first_event = {}
×
39
        event_source = first_event.get("eventSource")
1✔
40

41
        execution_id = uuid.uuid4()
1✔
42
        # Create a copy of the original input events
43
        events = input_events.copy()
1✔
44
        try:
1✔
45
            self.logger.set_fields(executionId=str(execution_id))
1✔
46
            self.logger.log(
1✔
47
                messageType="ExecutionStarted",
48
                logLevel=LogLevel.INFO,
49
                payload=to_json_str(events),
50
            )
51
            # An execution is only triggered upon successful polling. Therefore, `PollingStageStarted` never occurs.
52
            self.logger.log(
1✔
53
                messageType="PollingStageSucceeded",
54
                logLevel=LogLevel.TRACE,
55
            )
56
            # Target Stage
57
            self.process_target_stage(events)
1✔
58
            self.logger.log(
1✔
59
                messageType="ExecutionSucceeded",
60
                logLevel=LogLevel.INFO,
61
            )
62
            esm_counter.labels(source=event_source, status=EsmExecutionStatus.success).increment()
1✔
63
        except PartialFailureSenderError as e:
1✔
64
            self.logger.log(
1✔
65
                messageType="ExecutionFailed",
66
                logLevel=LogLevel.ERROR,
67
                error=e.error,
68
            )
69
            esm_counter.labels(
1✔
70
                source=event_source, status=EsmExecutionStatus.partial_batch_failure_error
71
            ).increment()
72
            # TODO: check whether partial batch item failures is enabled by default or need to be explicitly enabled
73
            #  using --function-response-types "ReportBatchItemFailures"
74
            #  https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html
75
            raise PartialBatchFailureError(
1✔
76
                partial_failure_payload=e.partial_failure_payload, error=e.error
77
            ) from e
78
        except SenderError as e:
1✔
79
            self.logger.log(
1✔
80
                messageType="ExecutionFailed",
81
                logLevel=LogLevel.ERROR,
82
                error=e.error,
83
            )
84
            esm_counter.labels(
1✔
85
                source=event_source, status=EsmExecutionStatus.target_invocation_error
86
            ).increment()
87
            raise BatchFailureError(error=e.error) from e
1✔
UNCOV
88
        except Exception as e:
×
UNCOV
89
            LOG.error(
×
90
                "Unhandled exception while processing Lambda event source mapping (ESM) events %s for ESM with execution id %s",
91
                events,
92
                execution_id,
93
                exc_info=LOG.isEnabledFor(logging.DEBUG),
94
            )
UNCOV
95
            esm_counter.labels(
×
96
                source=event_source, status=EsmExecutionStatus.unhandled_error
97
            ).increment()
UNCOV
98
            raise e
×
99

100
    def process_target_stage(self, events: list[dict]) -> None:
1✔
101
        try:
1✔
102
            self.logger.log(
1✔
103
                messageType="TargetStageEntered",
104
                logLevel=LogLevel.INFO,
105
            )
106
            # 2) Deliver to target in batches
107
            try:
1✔
108
                self.logger.log(
1✔
109
                    messageType="TargetInvocationStarted",
110
                    logLevel=LogLevel.TRACE,
111
                )
112
                # TODO: handle and log target invocation + stage skipped (when no records present)
113
                payload = self.sender.send_events(events)
1✔
114
                if payload:
1✔
115
                    # TODO: test unserializable content (e.g., byte strings)
116
                    payload = json.dumps(payload)
1✔
117
                else:
118
                    payload = ""
1✔
119
                self.logger.log(
1✔
120
                    messageType="TargetInvocationSucceeded",
121
                    logLevel=LogLevel.TRACE,
122
                )
123
            except PartialFailureSenderError as e:
1✔
124
                self.logger.log(
1✔
125
                    messageType="TargetInvocationPartiallyFailed",
126
                    logLevel=LogLevel.ERROR,
127
                    error=e.error,
128
                )
129
                raise e
1✔
130
            except SenderError as e:
1✔
131
                self.logger.log(
1✔
132
                    messageType="TargetInvocationFailed",
133
                    logLevel=LogLevel.ERROR,
134
                    error=e.error,
135
                )
136
                raise e
1✔
137
            self.logger.log(
1✔
138
                messageType="TargetStageSucceeded",
139
                logLevel=LogLevel.INFO,
140
                payload=payload,
141
            )
142
        except PartialFailureSenderError as e:
1✔
143
            self.logger.log(
1✔
144
                messageType="TargetStagePartiallyFailed",
145
                logLevel=LogLevel.ERROR,
146
                error=e.error,
147
            )
148
            raise e
1✔
149
        except SenderError as e:
1✔
150
            self.logger.log(
1✔
151
                messageType="TargetStageFailed",
152
                logLevel=LogLevel.ERROR,
153
                error=e.error,
154
            )
155
            raise e
1✔
156

157
    def generate_event_failure_context(self, abort_condition: str, **kwargs) -> dict:
1✔
158
        error_payload: dict = kwargs.get("error")
1✔
159
        if not error_payload:
1✔
UNCOV
160
            return {}
×
161
        # TODO: Should 'requestContext' and 'responseContext' be defined as models?
162
        context = {
1✔
163
            "requestContext": {
164
                "requestId": error_payload.get("requestId"),
165
                "functionArn": self.sender.target_arn,  # get the target ARN from the sender (always LambdaSender)
166
                "condition": abort_condition,
167
                "approximateInvokeCount": kwargs.get("attempts_count"),
168
            },
169
            "responseContext": {
170
                "statusCode": error_payload.get("httpStatusCode"),
171
                "executedVersion": error_payload.get("executedVersion"),
172
                "functionError": error_payload.get("functionError"),
173
            },
174
        }
175

176
        return context
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc