• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 97a8a4aa-ae20-4b84-ac2d-44cdd596c560

26 Mar 2025 12:46PM UTC coverage: 86.871% (+0.009%) from 86.862%
97a8a4aa-ae20-4b84-ac2d-44cdd596c560

push

circleci

web-flow
CloudFormation: Fix LoggingConfiguration Parameter Handling in StepFunctions Resource Provider (#12433)

5 of 6 new or added lines in 1 file covered. (83.33%)

42 existing lines in 7 files now uncovered.

63261 of 72822 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker_factory.py
1
from typing import Callable
1✔
2

3
import botocore.config
1✔
4

5
from localstack.aws.api.lambda_ import (
1✔
6
    EventSourceMappingConfiguration,
7
    FunctionResponseType,
8
)
9
from localstack.aws.api.pipes import (
1✔
10
    DynamoDBStreamStartPosition,
11
    KinesisStreamStartPosition,
12
    PipeSourceDynamoDBStreamParameters,
13
    PipeSourceKinesisStreamParameters,
14
    PipeSourceParameters,
15
    PipeSourceSqsQueueParameters,
16
    PipeTargetInvocationType,
17
    PipeTargetLambdaFunctionParameters,
18
    PipeTargetParameters,
19
)
20
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
21
from localstack.services.lambda_.event_source_mapping.esm_event_processor import (
1✔
22
    EsmEventProcessor,
23
)
24
from localstack.services.lambda_.event_source_mapping.esm_worker import EsmStateReason, EsmWorker
1✔
25
from localstack.services.lambda_.event_source_mapping.pipe_loggers.noops_pipe_logger import (
1✔
26
    NoOpsPipeLogger,
27
)
28
from localstack.services.lambda_.event_source_mapping.pipe_utils import (
1✔
29
    get_internal_client,
30
    get_standardized_service_name,
31
)
32
from localstack.services.lambda_.event_source_mapping.pollers.dynamodb_poller import DynamoDBPoller
1✔
33
from localstack.services.lambda_.event_source_mapping.pollers.kinesis_poller import KinesisPoller
1✔
34
from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller
1✔
35
from localstack.services.lambda_.event_source_mapping.pollers.sqs_poller import (
1✔
36
    DEFAULT_MAX_WAIT_TIME_SECONDS,
37
    SqsPoller,
38
)
39
from localstack.services.lambda_.event_source_mapping.senders.lambda_sender import LambdaSender
1✔
40
from localstack.utils.aws.arns import parse_arn
1✔
41
from localstack.utils.aws.client_types import ServicePrincipal
1✔
42
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
1✔
43
    DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS,
44
    is_lambda_debug_mode,
45
)
46

47

48
class PollerHolder:
1✔
49
    """Holds a `Callable` function `create_poller_fn` used to create a Poller. Useful when creating Pollers downstream via hooks."""
50

51
    create_poller_fn: Callable[..., Poller] | None = None
1✔
52

53

54
class EsmWorkerFactory:
1✔
55
    esm_config: EventSourceMappingConfiguration
1✔
56
    function_role_arn: str
1✔
57
    enabled: bool
1✔
58

59
    def __init__(self, esm_config, function_role, enabled):
1✔
60
        self.esm_config = esm_config
1✔
61
        self.function_role_arn = function_role
1✔
62
        self.enabled = enabled
1✔
63

64
    def get_esm_worker(self) -> EsmWorker:
1✔
65
        # Sender (always Lambda)
66
        function_arn = self.esm_config["FunctionArn"]
1✔
67

68
        if is_lambda_debug_mode():
1✔
69
            timeout_seconds = DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
×
70
        else:
71
            # 900s is the maximum amount of time a Lambda can run for.
72
            lambda_max_timeout_seconds = 900
1✔
73
            invoke_timeout_buffer_seconds = 5
1✔
74
            timeout_seconds = lambda_max_timeout_seconds + invoke_timeout_buffer_seconds
1✔
75

76
        lambda_client = get_internal_client(
1✔
77
            arn=function_arn,  # Only the function_arn is necessary since the Lambda should be able to invoke itself
78
            client_config=botocore.config.Config(
79
                retries={
80
                    "total_max_attempts": 1
81
                },  # Disable retries, to prevent re-invoking the Lambda
82
                read_timeout=timeout_seconds,
83
                tcp_keepalive=True,
84
            ),
85
        )
86
        sender = LambdaSender(
1✔
87
            target_arn=function_arn,
88
            target_parameters=PipeTargetParameters(
89
                LambdaFunctionParameters=PipeTargetLambdaFunctionParameters(
90
                    InvocationType=PipeTargetInvocationType.REQUEST_RESPONSE
91
                )
92
            ),
93
            target_client=lambda_client,
94
            payload_dict=True,  # TODO: This should be handled better since not all payloads in ESM are in the form { "Records" : List[Dict]}
95
            report_batch_item_failures=self.esm_config.get("FunctionResponseTypes")
96
            == [FunctionResponseType.ReportBatchItemFailures],
97
        )
98

99
        # Logger
100
        logger = NoOpsPipeLogger()
1✔
101

102
        # Event Source Mapping processor
103
        esm_processor = EsmEventProcessor(sender=sender, logger=logger)
1✔
104

105
        # Poller
106
        source_service = ""
1✔
107
        source_client = None
1✔
108
        source_arn = self.esm_config.get("EventSourceArn", "")
1✔
109
        if source_arn:
1✔
110
            parsed_source_arn = parse_arn(source_arn)
1✔
111
            source_service = get_standardized_service_name(parsed_source_arn["service"])
1✔
112
            source_client = get_internal_client(
1✔
113
                arn=source_arn,
114
                role_arn=self.function_role_arn,
115
                service_principal=ServicePrincipal.lambda_,
116
                source_arn=self.esm_config["FunctionArn"],
117
                client_config=botocore.config.Config(
118
                    retries={"total_max_attempts": 1},  # Disable retries
119
                    read_timeout=max(
120
                        self.esm_config.get(
121
                            "MaximumBatchingWindowInSeconds", DEFAULT_MAX_WAIT_TIME_SECONDS
122
                        ),
123
                        60,
124
                    )
125
                    + 5,  # Extend read timeout (with 5s buffer) for long-polling
126
                    # Setting tcp_keepalive to true allows the boto client to keep
127
                    # a long-running TCP connection when making calls to the gateway.
128
                    # This ensures long-poll calls do not prematurely have their socket
129
                    # connection marked as stale if no data is transferred for a given
130
                    # period of time hence preventing premature drops or resets of the
131
                    # connection.
132
                    # See https://aws.amazon.com/blogs/networking-and-content-delivery/implementing-long-running-tcp-connections-within-vpc-networking/
133
                    tcp_keepalive=True,
134
                ),
135
            )
136

137
        filter_criteria = self.esm_config.get("FilterCriteria", {"Filters": []})
1✔
138
        user_state_reason = EsmStateReason.USER_ACTION
1✔
139
        if source_service == "sqs":
1✔
140
            user_state_reason = EsmStateReason.USER_INITIATED
1✔
141
            source_parameters = PipeSourceParameters(
1✔
142
                FilterCriteria=filter_criteria,
143
                SqsQueueParameters=PipeSourceSqsQueueParameters(
144
                    BatchSize=self.esm_config["BatchSize"],
145
                    MaximumBatchingWindowInSeconds=self.esm_config[
146
                        "MaximumBatchingWindowInSeconds"
147
                    ],
148
                ),
149
            )
150
            poller = SqsPoller(
1✔
151
                source_arn=source_arn,
152
                source_parameters=source_parameters,
153
                source_client=source_client,
154
                processor=esm_processor,
155
            )
156
        elif source_service == "kinesis":
1✔
157
            # TODO: map all supported ESM to Pipe parameters
158
            optional_params = {}
1✔
159
            dead_letter_config_arn = (
1✔
160
                self.esm_config.get("DestinationConfig", {}).get("OnFailure", {}).get("Destination")
161
            )
162
            if dead_letter_config_arn:
1✔
163
                optional_params["DeadLetterConfig"] = {"Arn": dead_letter_config_arn}
1✔
164
            source_parameters = PipeSourceParameters(
1✔
165
                FilterCriteria=filter_criteria,
166
                KinesisStreamParameters=PipeSourceKinesisStreamParameters(
167
                    StartingPosition=KinesisStreamStartPosition[
168
                        self.esm_config["StartingPosition"]
169
                    ],
170
                    BatchSize=self.esm_config["BatchSize"],
171
                    MaximumBatchingWindowInSeconds=self.esm_config[
172
                        "MaximumBatchingWindowInSeconds"
173
                    ],
174
                    MaximumRetryAttempts=self.esm_config["MaximumRetryAttempts"],
175
                    **optional_params,
176
                ),
177
            )
178
            poller = KinesisPoller(
1✔
179
                esm_uuid=self.esm_config["UUID"],
180
                source_arn=source_arn,
181
                source_parameters=source_parameters,
182
                source_client=source_client,
183
                processor=esm_processor,
184
                invoke_identity_arn=self.function_role_arn,
185
                kinesis_namespace=True,
186
            )
187
        elif source_service == "dynamodbstreams":
1✔
188
            # TODO: map all supported ESM to Pipe parameters
189
            optional_params = {}
1✔
190
            dead_letter_config_arn = (
1✔
191
                self.esm_config.get("DestinationConfig", {}).get("OnFailure", {}).get("Destination")
192
            )
193
            if dead_letter_config_arn:
1✔
194
                optional_params["DeadLetterConfig"] = {"Arn": dead_letter_config_arn}
1✔
195
            source_parameters = PipeSourceParameters(
1✔
196
                FilterCriteria=filter_criteria,
197
                DynamoDBStreamParameters=PipeSourceDynamoDBStreamParameters(
198
                    StartingPosition=DynamoDBStreamStartPosition[
199
                        self.esm_config["StartingPosition"]
200
                    ],
201
                    BatchSize=self.esm_config["BatchSize"],
202
                    MaximumBatchingWindowInSeconds=self.esm_config[
203
                        "MaximumBatchingWindowInSeconds"
204
                    ],
205
                    MaximumRetryAttempts=self.esm_config["MaximumRetryAttempts"],
206
                    **optional_params,
207
                ),
208
            )
209
            poller = DynamoDBPoller(
1✔
210
                esm_uuid=self.esm_config["UUID"],
211
                source_arn=source_arn,
212
                source_parameters=source_parameters,
213
                source_client=source_client,
214
                processor=esm_processor,
215
            )
216
        else:
217
            poller_holder = PollerHolder()
×
UNCOV
218
            lambda_hooks.create_event_source_poller.run(
×
219
                poller_holder, source_service, self.esm_config
220
            )
221

UNCOV
222
            if not poller_holder.create_poller_fn:
×
UNCOV
223
                raise Exception(
×
224
                    f"Unsupported event source mapping source service {source_service}. Please upvote or create a feature request."
225
                )
226

UNCOV
227
            poller: Poller = poller_holder.create_poller_fn(
×
228
                arn=source_arn,
229
                client=source_client,
230
                processor=esm_processor,
231
            )
232

233
        esm_worker = EsmWorker(
1✔
234
            self.esm_config,
235
            poller=poller,
236
            enabled=self.enabled,
237
            user_state_reason=user_state_reason,
238
        )
239
        return esm_worker
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc