• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16665047018

31 Jul 2025 06:34PM UTC coverage: 86.897% (+0.1%) from 86.781%
16665047018

push

github

web-flow
Apigw/enable vpce routing (#12937)

5 of 5 new or added lines in 1 file covered. (100.0%)

314 existing lines in 13 files now uncovered.

66469 of 76492 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker_factory.py
1
from typing import Callable
1✔
2

3
import botocore.config
1✔
4

5
from localstack.aws.api.lambda_ import (
1✔
6
    EventSourceMappingConfiguration,
7
    FunctionResponseType,
8
)
9
from localstack.aws.api.pipes import (
1✔
10
    DynamoDBStreamStartPosition,
11
    KinesisStreamStartPosition,
12
    PipeSourceDynamoDBStreamParameters,
13
    PipeSourceKinesisStreamParameters,
14
    PipeSourceParameters,
15
    PipeSourceSqsQueueParameters,
16
    PipeTargetInvocationType,
17
    PipeTargetLambdaFunctionParameters,
18
    PipeTargetParameters,
19
)
20
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
21
from localstack.services.lambda_ import ldm
1✔
22
from localstack.services.lambda_.event_source_mapping.esm_event_processor import (
1✔
23
    EsmEventProcessor,
24
)
25
from localstack.services.lambda_.event_source_mapping.esm_worker import EsmStateReason, EsmWorker
1✔
26
from localstack.services.lambda_.event_source_mapping.pipe_loggers.noops_pipe_logger import (
1✔
27
    NoOpsPipeLogger,
28
)
29
from localstack.services.lambda_.event_source_mapping.pipe_utils import (
1✔
30
    get_internal_client,
31
    get_standardized_service_name,
32
)
33
from localstack.services.lambda_.event_source_mapping.pollers.dynamodb_poller import DynamoDBPoller
1✔
34
from localstack.services.lambda_.event_source_mapping.pollers.kinesis_poller import KinesisPoller
1✔
35
from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller
1✔
36
from localstack.services.lambda_.event_source_mapping.pollers.sqs_poller import (
1✔
37
    DEFAULT_MAX_WAIT_TIME_SECONDS,
38
    SqsPoller,
39
)
40
from localstack.services.lambda_.event_source_mapping.senders.lambda_sender import LambdaSender
1✔
41
from localstack.utils.aws.arns import parse_arn
1✔
42
from localstack.utils.aws.client_types import ServicePrincipal
1✔
43

44

45
class PollerHolder:
1✔
46
    """Holds a `Callable` function `create_poller_fn` used to create a Poller. Useful when creating Pollers downstream via hooks."""
47

48
    create_poller_fn: Callable[..., Poller] | None = None
1✔
49

50

51
class EsmWorkerFactory:
1✔
52
    esm_config: EventSourceMappingConfiguration
1✔
53
    function_role_arn: str
1✔
54
    enabled: bool
1✔
55

56
    def __init__(self, esm_config, function_role, enabled):
1✔
57
        self.esm_config = esm_config
1✔
58
        self.function_role_arn = function_role
1✔
59
        self.enabled = enabled
1✔
60

61
    def get_esm_worker(self) -> EsmWorker:
1✔
62
        # Sender (always Lambda)
63
        function_arn = self.esm_config["FunctionArn"]
1✔
64

65
        if ldm.IS_LDM_ENABLED:
1✔
UNCOV
66
            timeout_seconds = ldm.DEFAULT_LDM_TIMEOUT_SECONDS
×
67
        else:
68
            # 900s is the maximum amount of time a Lambda can run for.
69
            lambda_max_timeout_seconds = 900
1✔
70
            invoke_timeout_buffer_seconds = 5
1✔
71
            timeout_seconds = lambda_max_timeout_seconds + invoke_timeout_buffer_seconds
1✔
72

73
        lambda_client = get_internal_client(
1✔
74
            arn=function_arn,  # Only the function_arn is necessary since the Lambda should be able to invoke itself
75
            client_config=botocore.config.Config(
76
                retries={
77
                    "total_max_attempts": 1
78
                },  # Disable retries, to prevent re-invoking the Lambda
79
                read_timeout=timeout_seconds,
80
                tcp_keepalive=True,
81
            ),
82
        )
83
        sender = LambdaSender(
1✔
84
            target_arn=function_arn,
85
            target_parameters=PipeTargetParameters(
86
                LambdaFunctionParameters=PipeTargetLambdaFunctionParameters(
87
                    InvocationType=PipeTargetInvocationType.REQUEST_RESPONSE
88
                )
89
            ),
90
            target_client=lambda_client,
91
            payload_dict=True,  # TODO: This should be handled better since not all payloads in ESM are in the form { "Records" : List[Dict]}
92
            report_batch_item_failures=self.esm_config.get("FunctionResponseTypes")
93
            == [FunctionResponseType.ReportBatchItemFailures],
94
        )
95

96
        # Logger
97
        logger = NoOpsPipeLogger()
1✔
98

99
        # Event Source Mapping processor
100
        esm_processor = EsmEventProcessor(sender=sender, logger=logger)
1✔
101

102
        # Poller
103
        source_service = ""
1✔
104
        source_client = None
1✔
105
        source_arn = self.esm_config.get("EventSourceArn", "")
1✔
106
        if source_arn:
1✔
107
            parsed_source_arn = parse_arn(source_arn)
1✔
108
            source_service = get_standardized_service_name(parsed_source_arn["service"])
1✔
109
            source_client = get_internal_client(
1✔
110
                arn=source_arn,
111
                role_arn=self.function_role_arn,
112
                service_principal=ServicePrincipal.lambda_,
113
                source_arn=self.esm_config["FunctionArn"],
114
                client_config=botocore.config.Config(
115
                    retries={"total_max_attempts": 1},  # Disable retries
116
                    read_timeout=max(
117
                        self.esm_config.get(
118
                            "MaximumBatchingWindowInSeconds", DEFAULT_MAX_WAIT_TIME_SECONDS
119
                        ),
120
                        60,
121
                    )
122
                    + 5,  # Extend read timeout (with 5s buffer) for long-polling
123
                    # Setting tcp_keepalive to true allows the boto client to keep
124
                    # a long-running TCP connection when making calls to the gateway.
125
                    # This ensures long-poll calls do not prematurely have their socket
126
                    # connection marked as stale if no data is transferred for a given
127
                    # period of time hence preventing premature drops or resets of the
128
                    # connection.
129
                    # See https://aws.amazon.com/blogs/networking-and-content-delivery/implementing-long-running-tcp-connections-within-vpc-networking/
130
                    tcp_keepalive=True,
131
                ),
132
            )
133

134
        filter_criteria = self.esm_config.get("FilterCriteria", {"Filters": []})
1✔
135
        user_state_reason = EsmStateReason.USER_ACTION
1✔
136
        if source_service == "sqs":
1✔
137
            user_state_reason = EsmStateReason.USER_INITIATED
1✔
138
            source_parameters = PipeSourceParameters(
1✔
139
                FilterCriteria=filter_criteria,
140
                SqsQueueParameters=PipeSourceSqsQueueParameters(
141
                    BatchSize=self.esm_config["BatchSize"],
142
                    MaximumBatchingWindowInSeconds=self.esm_config[
143
                        "MaximumBatchingWindowInSeconds"
144
                    ],
145
                ),
146
            )
147
            poller = SqsPoller(
1✔
148
                source_arn=source_arn,
149
                source_parameters=source_parameters,
150
                source_client=source_client,
151
                processor=esm_processor,
152
            )
153
        elif source_service == "kinesis":
1✔
154
            # TODO: map all supported ESM to Pipe parameters
155
            optional_params = {}
1✔
156
            dead_letter_config_arn = (
1✔
157
                self.esm_config.get("DestinationConfig", {}).get("OnFailure", {}).get("Destination")
158
            )
159
            if dead_letter_config_arn:
1✔
160
                optional_params["DeadLetterConfig"] = {"Arn": dead_letter_config_arn}
1✔
161
            source_parameters = PipeSourceParameters(
1✔
162
                FilterCriteria=filter_criteria,
163
                KinesisStreamParameters=PipeSourceKinesisStreamParameters(
164
                    StartingPosition=KinesisStreamStartPosition[
165
                        self.esm_config["StartingPosition"]
166
                    ],
167
                    BatchSize=self.esm_config["BatchSize"],
168
                    MaximumBatchingWindowInSeconds=self.esm_config[
169
                        "MaximumBatchingWindowInSeconds"
170
                    ],
171
                    MaximumRetryAttempts=self.esm_config["MaximumRetryAttempts"],
172
                    MaximumRecordAgeInSeconds=self.esm_config["MaximumRecordAgeInSeconds"],
173
                    **optional_params,
174
                ),
175
            )
176
            poller = KinesisPoller(
1✔
177
                esm_uuid=self.esm_config["UUID"],
178
                source_arn=source_arn,
179
                source_parameters=source_parameters,
180
                source_client=source_client,
181
                processor=esm_processor,
182
                invoke_identity_arn=self.function_role_arn,
183
                kinesis_namespace=True,
184
            )
185
        elif source_service == "dynamodbstreams":
1✔
186
            # TODO: map all supported ESM to Pipe parameters
187
            optional_params = {}
1✔
188
            dead_letter_config_arn = (
1✔
189
                self.esm_config.get("DestinationConfig", {}).get("OnFailure", {}).get("Destination")
190
            )
191
            if dead_letter_config_arn:
1✔
192
                optional_params["DeadLetterConfig"] = {"Arn": dead_letter_config_arn}
1✔
193
            source_parameters = PipeSourceParameters(
1✔
194
                FilterCriteria=filter_criteria,
195
                DynamoDBStreamParameters=PipeSourceDynamoDBStreamParameters(
196
                    StartingPosition=DynamoDBStreamStartPosition[
197
                        self.esm_config["StartingPosition"]
198
                    ],
199
                    BatchSize=self.esm_config["BatchSize"],
200
                    MaximumBatchingWindowInSeconds=self.esm_config[
201
                        "MaximumBatchingWindowInSeconds"
202
                    ],
203
                    MaximumRetryAttempts=self.esm_config["MaximumRetryAttempts"],
204
                    MaximumRecordAgeInSeconds=self.esm_config["MaximumRecordAgeInSeconds"],
205
                    **optional_params,
206
                ),
207
            )
208
            poller = DynamoDBPoller(
1✔
209
                esm_uuid=self.esm_config["UUID"],
210
                source_arn=source_arn,
211
                source_parameters=source_parameters,
212
                source_client=source_client,
213
                processor=esm_processor,
214
            )
215
        else:
UNCOV
216
            poller_holder = PollerHolder()
×
UNCOV
217
            lambda_hooks.create_event_source_poller.run(
×
218
                poller_holder, source_service, self.esm_config
219
            )
220

UNCOV
221
            if not poller_holder.create_poller_fn:
×
UNCOV
222
                raise Exception(
×
223
                    f"Unsupported event source mapping source service {source_service}. Please upvote or create a feature request."
224
                )
225

UNCOV
226
            poller: Poller = poller_holder.create_poller_fn(
×
227
                arn=source_arn,
228
                client=source_client,
229
                processor=esm_processor,
230
            )
231

232
        esm_worker = EsmWorker(
1✔
233
            self.esm_config,
234
            poller=poller,
235
            enabled=self.enabled,
236
            user_state_reason=user_state_reason,
237
        )
238
        return esm_worker
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc