• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.35
/localstack-core/localstack/testing/aws/lambda_utils.py
1
import itertools
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import subprocess
1✔
6
import zipfile
1✔
7
from collections.abc import Mapping, Sequence
1✔
8
from pathlib import Path
1✔
9
from typing import TYPE_CHECKING, Literal, Optional, overload
1✔
10

11
from localstack import config
1✔
12
from localstack.services.lambda_.runtimes import RUNTIMES_AGGREGATED
1✔
13
from localstack.utils.files import load_file
1✔
14
from localstack.utils.platform import Arch, get_arch
1✔
15
from localstack.utils.strings import short_uid
1✔
16
from localstack.utils.sync import ShortCircuitWaitException, retry
1✔
17
from localstack.utils.testutil import get_lambda_log_events
1✔
18

19
if TYPE_CHECKING:
1✔
20
    from mypy_boto3_lambda import LambdaClient
×
21
    from mypy_boto3_lambda.literals import ArchitectureType, PackageTypeType, RuntimeType
×
UNCOV
22
    from mypy_boto3_lambda.type_defs import (
×
23
        DeadLetterConfigTypeDef,
24
        EnvironmentTypeDef,
25
        EphemeralStorageTypeDef,
26
        FileSystemConfigTypeDef,
27
        FunctionCodeTypeDef,
28
        FunctionConfigurationResponseMetadataTypeDef,
29
        ImageConfigTypeDef,
30
        TracingConfigTypeDef,
31
        VpcConfigTypeDef,
32
    )
33

34
LOG = logging.getLogger(__name__)
1✔
35

36
HANDLERS = {
1✔
37
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("nodejs"), "index.handler"),
38
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("python"), "handler.handler"),
39
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("java"), "echo.Handler"),
40
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("ruby"), "function.handler"),
41
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("dotnet"), "dotnet::Dotnet.Function::FunctionHandler"),
42
    # The handler value does not matter unless the custom runtime reads it in some way, but it is a required field.
43
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("provided"), "function.handler"),
44
}
45

46
PACKAGE_FOR_RUNTIME = {
1✔
47
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("nodejs"), "nodejs"),
48
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("python"), "python"),
49
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("java"), "java"),
50
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("ruby"), "ruby"),
51
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("dotnet"), "dotnet"),
52
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("provided"), "provided"),
53
}
54

55

56
def generate_tests(metafunc):
1✔
57
    i = next(metafunc.definition.iter_markers("multiruntime"), None)
1✔
58
    if not i:
1✔
59
        return
1✔
60
    if i.args:
1✔
UNCOV
61
        raise ValueError("doofus")
×
62

63
    scenario = i.kwargs["scenario"]
1✔
64
    runtimes = i.kwargs.get("runtimes")
1✔
65
    if not runtimes:
1✔
66
        runtimes = list(RUNTIMES_AGGREGATED.keys())
1✔
67
    ids = list(
1✔
68
        itertools.chain.from_iterable(
69
            RUNTIMES_AGGREGATED.get(runtime) or [runtime] for runtime in runtimes
70
        )
71
    )
72
    arg_values = [(scenario, runtime, HANDLERS[runtime]) for runtime in ids]
1✔
73

74
    metafunc.parametrize(
1✔
75
        argvalues=arg_values,
76
        argnames="multiruntime_lambda",
77
        indirect=True,
78
        ids=ids,
79
    )
80

81

82
def package_for_lang(scenario: str, runtime: str, root_folder: Path) -> str:
1✔
83
    """
84
    :param scenario: which scenario to run
85
    :param runtime: which runtime to build
86
    :param root_folder: The root folder for the scenarios
87
    :return: path to built zip file
88
    """
89
    runtime_folder = PACKAGE_FOR_RUNTIME[runtime]
1✔
90

91
    common_dir = root_folder / "functions" / "common"
1✔
92
    scenario_dir = common_dir / scenario
1✔
93
    runtime_dir_candidate = scenario_dir / runtime
1✔
94
    generic_runtime_dir_candidate = scenario_dir / runtime_folder
1✔
95

96
    # if a more specific folder exists, use that one
97
    # otherwise: try to fall back to generic runtime (e.g. python for python3.12)
98
    if runtime_dir_candidate.exists() and runtime_dir_candidate.is_dir():
1✔
99
        runtime_dir = runtime_dir_candidate
1✔
100
    else:
101
        runtime_dir = generic_runtime_dir_candidate
1✔
102

103
    build_dir = runtime_dir / "build"
1✔
104
    package_path = runtime_dir / "handler.zip"
1✔
105

106
    # caching step
107
    # TODO: add invalidation (e.g. via storing a hash besides this of all files in src)
108
    if os.path.exists(package_path) and os.path.isfile(package_path):
1✔
109
        return package_path
1✔
110

111
    # packaging
112
    # Use the default Lambda architecture x86_64 unless the ignore architecture flag is configured.
113
    # This enables local testing of both architectures on multi-architecture platforms such as Apple Silicon machines.
114
    architecture = "x86_64"
×
115
    if config.LAMBDA_IGNORE_ARCHITECTURE:
×
116
        architecture = "arm64" if get_arch() == Arch.arm64 else "x86_64"
×
117
    build_cmd = ["make", "build", f"ARCHITECTURE={architecture}"]
×
UNCOV
118
    LOG.debug(
×
119
        "Building Lambda function for scenario %s and runtime %s using %s.",
120
        scenario,
121
        runtime,
122
        " ".join(build_cmd),
123
    )
124
    result = subprocess.run(build_cmd, cwd=runtime_dir)
×
125
    if result.returncode != 0:
×
UNCOV
126
        raise Exception(
×
127
            f"Failed to build multiruntime {scenario=} for {runtime=} with error code: {result.returncode}"
128
        )
129

130
    # check again if the zip file is now present
131
    if os.path.exists(package_path) and os.path.isfile(package_path):
×
UNCOV
132
        return package_path
×
133

134
    # check something is in build now
135
    target_empty = len(os.listdir(build_dir)) <= 0
×
136
    if target_empty:
×
UNCOV
137
        raise Exception(f"Failed to build multiruntime {scenario=} for {runtime=} ")
×
138

139
    with zipfile.ZipFile(package_path, "w", strict_timestamps=True) as zf:
×
140
        for root, dirs, files in os.walk(build_dir):
×
141
            rel_dir = os.path.relpath(root, build_dir)
×
142
            for f in files:
×
UNCOV
143
                zf.write(os.path.join(root, f), arcname=os.path.join(rel_dir, f))
×
144

145
    # make sure package file has been generated
146
    assert package_path.exists() and package_path.is_file()
×
UNCOV
147
    return package_path
×
148

149

150
class ParametrizedLambda:
1✔
151
    lambda_client: "LambdaClient"
1✔
152
    function_names: list[str]
1✔
153
    scenario: str
1✔
154
    runtime: str
1✔
155
    handler: str
1✔
156
    zip_file_path: str
1✔
157
    role: str
1✔
158

159
    def __init__(
1✔
160
        self,
161
        lambda_client: "LambdaClient",
162
        scenario: str,
163
        runtime: str,
164
        handler: str,
165
        zip_file_path: str,
166
        role: str,
167
    ):
168
        self.function_names = []
1✔
169
        self.lambda_client = lambda_client
1✔
170
        self.scenario = scenario
1✔
171
        self.runtime = runtime
1✔
172
        self.handler = handler
1✔
173
        self.zip_file_path = zip_file_path
1✔
174
        self.role = role
1✔
175

176
    @overload
1✔
177
    def create_function(
1✔
178
        self,
179
        *,
180
        FunctionName: str | None = None,
181
        Role: str | None = None,
182
        Code: Optional["FunctionCodeTypeDef"] = None,
183
        Runtime: Optional["RuntimeType"] = None,
184
        Handler: str | None = None,
185
        Description: str | None = None,
186
        Timeout: int | None = None,
187
        MemorySize: int | None = None,
188
        Publish: bool | None = None,
189
        VpcConfig: Optional["VpcConfigTypeDef"] = None,
190
        PackageType: Optional["PackageTypeType"] = None,
191
        DeadLetterConfig: Optional["DeadLetterConfigTypeDef"] = None,
192
        Environment: Optional["EnvironmentTypeDef"] = None,
193
        KMSKeyArn: str | None = None,
194
        TracingConfig: Optional["TracingConfigTypeDef"] = None,
195
        Tags: Mapping[str, str] | None = None,
196
        Layers: Sequence[str] | None = None,
197
        FileSystemConfigs: Sequence["FileSystemConfigTypeDef"] | None = None,
198
        ImageConfig: Optional["ImageConfigTypeDef"] = None,
199
        CodeSigningConfigArn: str | None = None,
200
        Architectures: Sequence["ArchitectureType"] | None = None,
201
        EphemeralStorage: Optional["EphemeralStorageTypeDef"] = None,
202
    ) -> "FunctionConfigurationResponseMetadataTypeDef": ...
203

204
    def create_function(self, **kwargs):
1✔
205
        kwargs.setdefault("FunctionName", f"{self.scenario}-{short_uid()}")
1✔
206
        kwargs.setdefault("Runtime", self.runtime)
1✔
207
        kwargs.setdefault("Handler", self.handler)
1✔
208
        kwargs.setdefault("Role", self.role)
1✔
209
        kwargs.setdefault("Code", {"ZipFile": load_file(self.zip_file_path, mode="rb")})
1✔
210

211
        def _create_function():
1✔
212
            return self.lambda_client.create_function(**kwargs)
1✔
213

214
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
215
        # localstack should normally not require the retries and will just continue here
216
        result = retry(_create_function, retries=3, sleep=4)
1✔
217
        self.function_names.append(result["FunctionArn"])
1✔
218
        self.lambda_client.get_waiter("function_active_v2").wait(
1✔
219
            FunctionName=kwargs.get("FunctionName")
220
        )
221

222
        return result
1✔
223

224
    def destroy(self):
1✔
225
        for function_name in self.function_names:
1✔
226
            try:
1✔
227
                self.lambda_client.delete_function(FunctionName=function_name)
1✔
228
            except Exception as e:
×
UNCOV
229
                LOG.debug("Error deleting function %s: %s", function_name, e)
×
230

231

232
def update_done(client, function_name):
1✔
233
    """wait fn for checking 'LastUpdateStatus' of lambda"""
234

235
    def _update_done():
×
UNCOV
236
        last_update_status = client.get_function_configuration(FunctionName=function_name)[
×
237
            "LastUpdateStatus"
238
        ]
239
        if last_update_status == "Failed":
×
UNCOV
240
            raise ShortCircuitWaitException(f"Lambda Config update failed: {last_update_status=}")
×
241
        else:
UNCOV
242
            return last_update_status == "Successful"
×
243

UNCOV
244
    return _update_done
×
245

246

247
def concurrency_update_done(client, function_name, qualifier):
1✔
248
    """wait fn for ProvisionedConcurrencyConfig 'Status'"""
249

250
    def _concurrency_update_done():
1✔
251
        status = client.get_provisioned_concurrency_config(
1✔
252
            FunctionName=function_name, Qualifier=qualifier
253
        )["Status"]
254
        if status == "FAILED":
1✔
UNCOV
255
            raise ShortCircuitWaitException(f"Concurrency update failed: {status=}")
×
256
        else:
257
            return status == "READY"
1✔
258

259
    return _concurrency_update_done
1✔
260

261

262
def get_invoke_init_type(
1✔
263
    client, function_name, qualifier
264
) -> Literal["on-demand", "provisioned-concurrency"]:
265
    """check the environment in the lambda for AWS_LAMBDA_INITIALIZATION_TYPE indicating ondemand/provisioned"""
266
    invoke_result = client.invoke(FunctionName=function_name, Qualifier=qualifier)
×
UNCOV
267
    return json.load(invoke_result["Payload"])
×
268

269

270
lambda_role = {
1✔
271
    "Version": "2012-10-17",
272
    "Statement": [
273
        {
274
            "Effect": "Allow",
275
            "Principal": {"Service": "lambda.amazonaws.com"},
276
            "Action": "sts:AssumeRole",
277
        }
278
    ],
279
}
280
esm_lambda_permission = {
1✔
281
    "Version": "2012-10-17",
282
    "Statement": [
283
        {
284
            "Effect": "Allow",
285
            "Action": [
286
                "sqs:*",
287
                "sns:*",
288
                "dynamodb:DescribeStream",
289
                "dynamodb:GetRecords",
290
                "dynamodb:GetShardIterator",
291
                "dynamodb:ListStreams",
292
                "kinesis:DescribeStream",
293
                "kinesis:DescribeStreamSummary",
294
                "kinesis:GetRecords",
295
                "kinesis:GetShardIterator",
296
                "kinesis:ListShards",
297
                "kinesis:ListStreams",
298
                "kinesis:SubscribeToShard",
299
                "logs:CreateLogGroup",
300
                "logs:CreateLogStream",
301
                "logs:PutLogEvents",
302
                "s3:ListBucket",
303
                "s3:PutObject",
304
            ],
305
            "Resource": ["*"],
306
        }
307
    ],
308
}
309

310

311
def _await_event_source_mapping_state(lambda_client, uuid, state, retries=30):
1✔
312
    def assert_mapping_disabled():
1✔
313
        assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == state
1✔
314

315
    retry(assert_mapping_disabled, sleep_before=2, retries=retries)
1✔
316

317

318
def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):
1✔
319
    return _await_event_source_mapping_state(
1✔
320
        lambda_client=lambda_client, uuid=uuid, retries=retries, state="Enabled"
321
    )
322

323

324
def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):
1✔
325
    def assert_table_active():
1✔
326
        assert (
1✔
327
            dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"
328
        )
329

330
    retry(assert_table_active, retries=retries, sleep_before=2)
1✔
331

332

333
def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):
1✔
334
    def get_events():
1✔
335
        events = get_lambda_log_events(function_name, logs_client=logs_client)
1✔
336
        assert len(events) == expected_num_events
1✔
337
        return events
1✔
338

339
    return retry(get_events, retries=retries, sleep_before=5, sleep=5)
1✔
340

341

342
def is_docker_runtime_executor():
1✔
343
    return config.LAMBDA_RUNTIME_EXECUTOR in ["docker", ""]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc