• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21502801941

29 Jan 2026 06:53PM UTC coverage: 86.962% (+0.007%) from 86.955%
21502801941

push

github

web-flow
Route53: add `Update` support for `RecordSet` resource (#13627)

26 of 28 new or added lines in 1 file covered. (92.86%)

163 existing lines in 4 files now uncovered.

70379 of 80931 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.83
/localstack-core/localstack/testing/aws/lambda_utils.py
1
import itertools
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import subprocess
1✔
6
import zipfile
1✔
7
from collections.abc import Mapping, Sequence
1✔
8
from pathlib import Path
1✔
9
from typing import TYPE_CHECKING, Optional, overload
1✔
10

11
from localstack import config
1✔
12
from localstack.services.lambda_.invocation.lambda_models import InitializationType
1✔
13
from localstack.services.lambda_.runtimes import RUNTIMES_AGGREGATED
1✔
14
from localstack.utils.files import load_file
1✔
15
from localstack.utils.platform import Arch, get_arch
1✔
16
from localstack.utils.strings import short_uid
1✔
17
from localstack.utils.sync import ShortCircuitWaitException, retry, wait_until
1✔
18
from localstack.utils.testutil import get_lambda_log_events
1✔
19

20
if TYPE_CHECKING:
1✔
21
    from mypy_boto3_lambda import LambdaClient
×
22
    from mypy_boto3_lambda.literals import ArchitectureType, PackageTypeType, RuntimeType
×
23
    from mypy_boto3_lambda.type_defs import (
×
24
        CapacityProviderConfigTypeDef,
25
        DeadLetterConfigTypeDef,
26
        EnvironmentTypeDef,
27
        EphemeralStorageTypeDef,
28
        FileSystemConfigTypeDef,
29
        FunctionCodeTypeDef,
30
        FunctionConfigurationResponseMetadataTypeDef,
31
        ImageConfigTypeDef,
32
        TracingConfigTypeDef,
33
        VpcConfigTypeDef,
34
    )
35

36
LOG = logging.getLogger(__name__)
1✔
37

38
HANDLERS = {
1✔
39
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("nodejs"), "index.handler"),
40
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("python"), "handler.handler"),
41
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("java"), "echo.Handler"),
42
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("ruby"), "function.handler"),
43
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("dotnet"), "dotnet::Dotnet.Function::FunctionHandler"),
44
    # The handler value does not matter unless the custom runtime reads it in some way, but it is a required field.
45
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("provided"), "function.handler"),
46
}
47

48
PACKAGE_FOR_RUNTIME = {
1✔
49
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("nodejs"), "nodejs"),
50
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("python"), "python"),
51
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("java"), "java"),
52
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("ruby"), "ruby"),
53
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("dotnet"), "dotnet"),
54
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("provided"), "provided"),
55
}
56

57

58
def generate_tests(metafunc):
1✔
59
    i = next(metafunc.definition.iter_markers("multiruntime"), None)
1✔
60
    if not i:
1✔
61
        return
1✔
62
    if i.args:
1✔
UNCOV
63
        raise ValueError("doofus")
×
64

65
    scenario = i.kwargs["scenario"]
1✔
66
    runtimes = i.kwargs.get("runtimes")
1✔
67
    if not runtimes:
1✔
68
        runtimes = list(RUNTIMES_AGGREGATED.keys())
1✔
69
    ids = list(
1✔
70
        itertools.chain.from_iterable(
71
            RUNTIMES_AGGREGATED.get(runtime) or [runtime] for runtime in runtimes
72
        )
73
    )
74
    arg_values = [(scenario, runtime, HANDLERS[runtime]) for runtime in ids]
1✔
75

76
    metafunc.parametrize(
1✔
77
        argvalues=arg_values,
78
        argnames="multiruntime_lambda",
79
        indirect=True,
80
        ids=ids,
81
    )
82

83

84
def package_for_lang(scenario: str, runtime: str, root_folder: Path) -> str:
1✔
85
    """
86
    :param scenario: which scenario to run
87
    :param runtime: which runtime to build
88
    :param root_folder: The root folder for the scenarios
89
    :return: path to built zip file
90
    """
91
    runtime_folder = PACKAGE_FOR_RUNTIME[runtime]
1✔
92

93
    common_dir = root_folder / "functions" / "common"
1✔
94
    scenario_dir = common_dir / scenario
1✔
95
    runtime_dir_candidate = scenario_dir / runtime
1✔
96
    generic_runtime_dir_candidate = scenario_dir / runtime_folder
1✔
97

98
    # if a more specific folder exists, use that one
99
    # otherwise: try to fall back to generic runtime (e.g. python for python3.12)
100
    if runtime_dir_candidate.exists() and runtime_dir_candidate.is_dir():
1✔
101
        runtime_dir = runtime_dir_candidate
1✔
102
    else:
103
        runtime_dir = generic_runtime_dir_candidate
1✔
104

105
    build_dir = runtime_dir / "build"
1✔
106
    package_path = runtime_dir / "handler.zip"
1✔
107

108
    # caching step
109
    # TODO: add invalidation (e.g. via storing a hash besides this of all files in src)
110
    if os.path.exists(package_path) and os.path.isfile(package_path):
1✔
111
        return package_path
1✔
112

113
    # packaging
114
    # Use the default Lambda architecture x86_64 unless the ignore architecture flag is configured.
115
    # This enables local testing of both architectures on multi-architecture platforms such as Apple Silicon machines.
116
    architecture = "x86_64"
×
117
    if config.LAMBDA_IGNORE_ARCHITECTURE:
×
118
        architecture = "arm64" if get_arch() == Arch.arm64 else "x86_64"
×
119
    build_cmd = ["make", "build", f"ARCHITECTURE={architecture}"]
×
UNCOV
120
    LOG.debug(
×
121
        "Building Lambda function for scenario %s and runtime %s using %s.",
122
        scenario,
123
        runtime,
124
        " ".join(build_cmd),
125
    )
126
    result = subprocess.run(build_cmd, cwd=runtime_dir)
×
127
    if result.returncode != 0:
×
UNCOV
128
        raise Exception(
×
129
            f"Failed to build multiruntime {scenario=} for {runtime=} with error code: {result.returncode}"
130
        )
131

132
    # check again if the zip file is now present
133
    if os.path.exists(package_path) and os.path.isfile(package_path):
×
UNCOV
134
        return package_path
×
135

136
    # check something is in build now
137
    target_empty = len(os.listdir(build_dir)) <= 0
×
138
    if target_empty:
×
UNCOV
139
        raise Exception(f"Failed to build multiruntime {scenario=} for {runtime=} ")
×
140

141
    with zipfile.ZipFile(package_path, "w", strict_timestamps=True) as zf:
×
142
        for root, dirs, files in os.walk(build_dir):
×
143
            rel_dir = os.path.relpath(root, build_dir)
×
144
            for f in files:
×
UNCOV
145
                zf.write(os.path.join(root, f), arcname=os.path.join(rel_dir, f))
×
146

147
    # make sure package file has been generated
148
    assert package_path.exists() and package_path.is_file()
×
UNCOV
149
    return package_path
×
150

151

152
class ParametrizedLambda:
1✔
153
    lambda_client: "LambdaClient"
1✔
154
    function_names: list[str]
1✔
155
    scenario: str
1✔
156
    runtime: str
1✔
157
    handler: str
1✔
158
    zip_file_path: str
1✔
159
    role: str
1✔
160

161
    def __init__(
1✔
162
        self,
163
        lambda_client: "LambdaClient",
164
        scenario: str,
165
        runtime: str,
166
        handler: str,
167
        zip_file_path: str,
168
        role: str,
169
    ):
170
        self.function_names = []
1✔
171
        self.lambda_client = lambda_client
1✔
172
        self.scenario = scenario
1✔
173
        self.runtime = runtime
1✔
174
        self.handler = handler
1✔
175
        self.zip_file_path = zip_file_path
1✔
176
        self.role = role
1✔
177

178
    @overload
1✔
179
    def create_function(
1✔
180
        self,
181
        *,
182
        FunctionName: str | None = None,
183
        Role: str | None = None,
184
        Code: Optional["FunctionCodeTypeDef"] = None,
185
        Runtime: Optional["RuntimeType"] = None,
186
        Handler: str | None = None,
187
        Description: str | None = None,
188
        Timeout: int | None = None,
189
        MemorySize: int | None = None,
190
        Publish: bool | None = None,
191
        VpcConfig: Optional["VpcConfigTypeDef"] = None,
192
        PackageType: Optional["PackageTypeType"] = None,
193
        DeadLetterConfig: Optional["DeadLetterConfigTypeDef"] = None,
194
        Environment: Optional["EnvironmentTypeDef"] = None,
195
        KMSKeyArn: str | None = None,
196
        TracingConfig: Optional["TracingConfigTypeDef"] = None,
197
        Tags: Mapping[str, str] | None = None,
198
        Layers: Sequence[str] | None = None,
199
        FileSystemConfigs: Sequence["FileSystemConfigTypeDef"] | None = None,
200
        ImageConfig: Optional["ImageConfigTypeDef"] = None,
201
        CodeSigningConfigArn: str | None = None,
202
        Architectures: Sequence["ArchitectureType"] | None = None,
203
        EphemeralStorage: Optional["EphemeralStorageTypeDef"] = None,
204
        CapacityProviderConfig: Optional["CapacityProviderConfigTypeDef"] = None,
205
    ) -> "FunctionConfigurationResponseMetadataTypeDef": ...
206

207
    def create_function(self, **kwargs):
1✔
208
        kwargs.setdefault("FunctionName", f"{self.scenario}-{short_uid()}")
1✔
209
        kwargs.setdefault("Runtime", self.runtime)
1✔
210
        kwargs.setdefault("Handler", self.handler)
1✔
211
        kwargs.setdefault("Role", self.role)
1✔
212
        kwargs.setdefault("Code", {"ZipFile": load_file(self.zip_file_path, mode="rb")})
1✔
213

214
        def _create_function():
1✔
215
            return self.lambda_client.create_function(**kwargs)
1✔
216

217
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
218
        # localstack should normally not require the retries and will just continue here
219
        result = retry(_create_function, retries=3, sleep=4)
1✔
220
        self.function_names.append(result["FunctionArn"])
1✔
221

222
        def _is_not_pending():
1✔
223
            # Using custom wait condition instead of the 'function_active_v2' waiter which expects 'Active' state,
224
            # which is not true for lambda managed instances, whose state becomes in ActiveNonInvokable
225
            try:
1✔
226
                result = (
1✔
227
                    self.lambda_client.get_function(FunctionName=kwargs.get("FunctionName"))[
228
                        "Configuration"
229
                    ]["State"]
230
                    != "Pending"
231
                )
232
                LOG.debug("lambda state result: result=%s", result)
1✔
233
                return result
1✔
UNCOV
234
            except Exception as e:
×
UNCOV
235
                LOG.error(e)
×
236
                raise
×
237

238
        wait_until(_is_not_pending)
1✔
239

240
        return result
1✔
241

242
    def destroy(self):
1✔
243
        for function_name in self.function_names:
1✔
244
            try:
1✔
245
                self.lambda_client.delete_function(FunctionName=function_name)
1✔
UNCOV
246
            except Exception as e:
×
UNCOV
247
                LOG.debug("Error deleting function %s: %s", function_name, e)
×
248

249

250
def update_done(client, function_name):
1✔
251
    """wait fn for checking 'LastUpdateStatus' of lambda"""
252

UNCOV
253
    def _update_done():
×
UNCOV
254
        last_update_status = client.get_function_configuration(FunctionName=function_name)[
×
255
            "LastUpdateStatus"
256
        ]
UNCOV
257
        if last_update_status == "Failed":
×
UNCOV
258
            raise ShortCircuitWaitException(f"Lambda Config update failed: {last_update_status=}")
×
259
        else:
UNCOV
260
            return last_update_status == "Successful"
×
261

UNCOV
262
    return _update_done
×
263

264

265
def concurrency_update_done(client, function_name, qualifier):
1✔
266
    """wait fn for ProvisionedConcurrencyConfig 'Status'"""
267

268
    def _concurrency_update_done():
1✔
269
        status = client.get_provisioned_concurrency_config(
1✔
270
            FunctionName=function_name, Qualifier=qualifier
271
        )["Status"]
272
        if status == "FAILED":
1✔
UNCOV
273
            raise ShortCircuitWaitException(f"Concurrency update failed: {status=}")
×
274
        else:
275
            return status == "READY"
1✔
276

277
    return _concurrency_update_done
1✔
278

279

280
def get_invoke_init_type(client, function_name, qualifier) -> InitializationType:
1✔
281
    """check the environment in the lambda for AWS_LAMBDA_INITIALIZATION_TYPE indicating ondemand/provisioned"""
UNCOV
282
    invoke_result = client.invoke(FunctionName=function_name, Qualifier=qualifier)
×
UNCOV
283
    return json.load(invoke_result["Payload"])
×
284

285

286
lambda_role = {
1✔
287
    "Version": "2012-10-17",
288
    "Statement": [
289
        {
290
            "Effect": "Allow",
291
            "Principal": {"Service": "lambda.amazonaws.com"},
292
            "Action": "sts:AssumeRole",
293
        }
294
    ],
295
}
296
esm_lambda_permission = {
1✔
297
    "Version": "2012-10-17",
298
    "Statement": [
299
        {
300
            "Effect": "Allow",
301
            "Action": [
302
                "sqs:*",
303
                "sns:*",
304
                "dynamodb:DescribeStream",
305
                "dynamodb:GetRecords",
306
                "dynamodb:GetShardIterator",
307
                "dynamodb:ListStreams",
308
                "kinesis:DescribeStream",
309
                "kinesis:DescribeStreamSummary",
310
                "kinesis:GetRecords",
311
                "kinesis:GetShardIterator",
312
                "kinesis:ListShards",
313
                "kinesis:ListStreams",
314
                "kinesis:SubscribeToShard",
315
                "logs:CreateLogGroup",
316
                "logs:CreateLogStream",
317
                "logs:PutLogEvents",
318
                "s3:ListBucket",
319
                "s3:PutObject",
320
            ],
321
            "Resource": ["*"],
322
        }
323
    ],
324
}
325

326

327
def _await_event_source_mapping_state(lambda_client, uuid, state, retries=30):
1✔
328
    def assert_mapping_disabled():
1✔
329
        assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == state
1✔
330

331
    retry(assert_mapping_disabled, sleep_before=2, retries=retries)
1✔
332

333

334
def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):
1✔
335
    return _await_event_source_mapping_state(
1✔
336
        lambda_client=lambda_client, uuid=uuid, retries=retries, state="Enabled"
337
    )
338

339

340
def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):
1✔
341
    def assert_table_active():
1✔
342
        assert (
1✔
343
            dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"
344
        )
345

346
    retry(assert_table_active, retries=retries, sleep_before=2)
1✔
347

348

349
def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):
1✔
350
    def get_events():
1✔
351
        events = get_lambda_log_events(function_name, logs_client=logs_client)
1✔
352
        assert len(events) == expected_num_events
1✔
353
        return events
1✔
354

355
    return retry(get_events, retries=retries, sleep_before=5, sleep=5)
1✔
356

357

358
def is_docker_runtime_executor():
1✔
359
    return config.LAMBDA_RUNTIME_EXECUTOR in ["docker", ""]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc