• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20942662173

12 Jan 2026 04:45PM UTC coverage: 86.905% (-0.03%) from 86.936%
20942662173

push

github

web-flow
Allow authenticated pull and push of docker images (#13569)

34 of 51 new or added lines in 4 files covered. (66.67%)

247 existing lines in 15 files now uncovered.

70218 of 80799 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.52
/localstack-core/localstack/testing/aws/lambda_utils.py
1
import itertools
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import subprocess
1✔
6
import zipfile
1✔
7
from collections.abc import Mapping, Sequence
1✔
8
from pathlib import Path
1✔
9
from typing import TYPE_CHECKING, Optional, overload
1✔
10

11
from localstack import config
1✔
12
from localstack.services.lambda_.invocation.lambda_models import InitializationType
1✔
13
from localstack.services.lambda_.runtimes import RUNTIMES_AGGREGATED
1✔
14
from localstack.utils.files import load_file
1✔
15
from localstack.utils.platform import Arch, get_arch
1✔
16
from localstack.utils.strings import short_uid
1✔
17
from localstack.utils.sync import ShortCircuitWaitException, retry
1✔
18
from localstack.utils.testutil import get_lambda_log_events
1✔
19

20
if TYPE_CHECKING:
1✔
21
    from mypy_boto3_lambda import LambdaClient
×
22
    from mypy_boto3_lambda.literals import ArchitectureType, PackageTypeType, RuntimeType
×
UNCOV
23
    from mypy_boto3_lambda.type_defs import (
×
24
        DeadLetterConfigTypeDef,
25
        EnvironmentTypeDef,
26
        EphemeralStorageTypeDef,
27
        FileSystemConfigTypeDef,
28
        FunctionCodeTypeDef,
29
        FunctionConfigurationResponseMetadataTypeDef,
30
        ImageConfigTypeDef,
31
        TracingConfigTypeDef,
32
        VpcConfigTypeDef,
33
    )
34

35
LOG = logging.getLogger(__name__)
1✔
36

37
HANDLERS = {
1✔
38
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("nodejs"), "index.handler"),
39
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("python"), "handler.handler"),
40
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("java"), "echo.Handler"),
41
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("ruby"), "function.handler"),
42
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("dotnet"), "dotnet::Dotnet.Function::FunctionHandler"),
43
    # The handler value does not matter unless the custom runtime reads it in some way, but it is a required field.
44
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("provided"), "function.handler"),
45
}
46

47
PACKAGE_FOR_RUNTIME = {
1✔
48
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("nodejs"), "nodejs"),
49
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("python"), "python"),
50
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("java"), "java"),
51
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("ruby"), "ruby"),
52
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("dotnet"), "dotnet"),
53
    **dict.fromkeys(RUNTIMES_AGGREGATED.get("provided"), "provided"),
54
}
55

56

57
def generate_tests(metafunc):
1✔
58
    i = next(metafunc.definition.iter_markers("multiruntime"), None)
1✔
59
    if not i:
1✔
60
        return
1✔
61
    if i.args:
1✔
UNCOV
62
        raise ValueError("doofus")
×
63

64
    scenario = i.kwargs["scenario"]
1✔
65
    runtimes = i.kwargs.get("runtimes")
1✔
66
    if not runtimes:
1✔
67
        runtimes = list(RUNTIMES_AGGREGATED.keys())
1✔
68
    ids = list(
1✔
69
        itertools.chain.from_iterable(
70
            RUNTIMES_AGGREGATED.get(runtime) or [runtime] for runtime in runtimes
71
        )
72
    )
73
    arg_values = [(scenario, runtime, HANDLERS[runtime]) for runtime in ids]
1✔
74

75
    metafunc.parametrize(
1✔
76
        argvalues=arg_values,
77
        argnames="multiruntime_lambda",
78
        indirect=True,
79
        ids=ids,
80
    )
81

82

83
def package_for_lang(scenario: str, runtime: str, root_folder: Path) -> str:
1✔
84
    """
85
    :param scenario: which scenario to run
86
    :param runtime: which runtime to build
87
    :param root_folder: The root folder for the scenarios
88
    :return: path to built zip file
89
    """
90
    runtime_folder = PACKAGE_FOR_RUNTIME[runtime]
1✔
91

92
    common_dir = root_folder / "functions" / "common"
1✔
93
    scenario_dir = common_dir / scenario
1✔
94
    runtime_dir_candidate = scenario_dir / runtime
1✔
95
    generic_runtime_dir_candidate = scenario_dir / runtime_folder
1✔
96

97
    # if a more specific folder exists, use that one
98
    # otherwise: try to fall back to generic runtime (e.g. python for python3.12)
99
    if runtime_dir_candidate.exists() and runtime_dir_candidate.is_dir():
1✔
100
        runtime_dir = runtime_dir_candidate
1✔
101
    else:
102
        runtime_dir = generic_runtime_dir_candidate
1✔
103

104
    build_dir = runtime_dir / "build"
1✔
105
    package_path = runtime_dir / "handler.zip"
1✔
106

107
    # caching step
108
    # TODO: add invalidation (e.g. via storing a hash besides this of all files in src)
109
    if os.path.exists(package_path) and os.path.isfile(package_path):
1✔
110
        return package_path
1✔
111

112
    # packaging
113
    # Use the default Lambda architecture x86_64 unless the ignore architecture flag is configured.
114
    # This enables local testing of both architectures on multi-architecture platforms such as Apple Silicon machines.
115
    architecture = "x86_64"
×
116
    if config.LAMBDA_IGNORE_ARCHITECTURE:
×
117
        architecture = "arm64" if get_arch() == Arch.arm64 else "x86_64"
×
118
    build_cmd = ["make", "build", f"ARCHITECTURE={architecture}"]
×
UNCOV
119
    LOG.debug(
×
120
        "Building Lambda function for scenario %s and runtime %s using %s.",
121
        scenario,
122
        runtime,
123
        " ".join(build_cmd),
124
    )
125
    result = subprocess.run(build_cmd, cwd=runtime_dir)
×
126
    if result.returncode != 0:
×
UNCOV
127
        raise Exception(
×
128
            f"Failed to build multiruntime {scenario=} for {runtime=} with error code: {result.returncode}"
129
        )
130

131
    # check again if the zip file is now present
132
    if os.path.exists(package_path) and os.path.isfile(package_path):
×
UNCOV
133
        return package_path
×
134

135
    # check something is in build now
136
    target_empty = len(os.listdir(build_dir)) <= 0
×
137
    if target_empty:
×
UNCOV
138
        raise Exception(f"Failed to build multiruntime {scenario=} for {runtime=} ")
×
139

140
    with zipfile.ZipFile(package_path, "w", strict_timestamps=True) as zf:
×
141
        for root, dirs, files in os.walk(build_dir):
×
142
            rel_dir = os.path.relpath(root, build_dir)
×
143
            for f in files:
×
UNCOV
144
                zf.write(os.path.join(root, f), arcname=os.path.join(rel_dir, f))
×
145

146
    # make sure package file has been generated
147
    assert package_path.exists() and package_path.is_file()
×
UNCOV
148
    return package_path
×
149

150

151
class ParametrizedLambda:
1✔
152
    lambda_client: "LambdaClient"
1✔
153
    function_names: list[str]
1✔
154
    scenario: str
1✔
155
    runtime: str
1✔
156
    handler: str
1✔
157
    zip_file_path: str
1✔
158
    role: str
1✔
159

160
    def __init__(
1✔
161
        self,
162
        lambda_client: "LambdaClient",
163
        scenario: str,
164
        runtime: str,
165
        handler: str,
166
        zip_file_path: str,
167
        role: str,
168
    ):
169
        self.function_names = []
1✔
170
        self.lambda_client = lambda_client
1✔
171
        self.scenario = scenario
1✔
172
        self.runtime = runtime
1✔
173
        self.handler = handler
1✔
174
        self.zip_file_path = zip_file_path
1✔
175
        self.role = role
1✔
176

177
    @overload
1✔
178
    def create_function(
1✔
179
        self,
180
        *,
181
        FunctionName: str | None = None,
182
        Role: str | None = None,
183
        Code: Optional["FunctionCodeTypeDef"] = None,
184
        Runtime: Optional["RuntimeType"] = None,
185
        Handler: str | None = None,
186
        Description: str | None = None,
187
        Timeout: int | None = None,
188
        MemorySize: int | None = None,
189
        Publish: bool | None = None,
190
        VpcConfig: Optional["VpcConfigTypeDef"] = None,
191
        PackageType: Optional["PackageTypeType"] = None,
192
        DeadLetterConfig: Optional["DeadLetterConfigTypeDef"] = None,
193
        Environment: Optional["EnvironmentTypeDef"] = None,
194
        KMSKeyArn: str | None = None,
195
        TracingConfig: Optional["TracingConfigTypeDef"] = None,
196
        Tags: Mapping[str, str] | None = None,
197
        Layers: Sequence[str] | None = None,
198
        FileSystemConfigs: Sequence["FileSystemConfigTypeDef"] | None = None,
199
        ImageConfig: Optional["ImageConfigTypeDef"] = None,
200
        CodeSigningConfigArn: str | None = None,
201
        Architectures: Sequence["ArchitectureType"] | None = None,
202
        EphemeralStorage: Optional["EphemeralStorageTypeDef"] = None,
203
    ) -> "FunctionConfigurationResponseMetadataTypeDef": ...
204

205
    def create_function(self, **kwargs):
1✔
206
        kwargs.setdefault("FunctionName", f"{self.scenario}-{short_uid()}")
1✔
207
        kwargs.setdefault("Runtime", self.runtime)
1✔
208
        kwargs.setdefault("Handler", self.handler)
1✔
209
        kwargs.setdefault("Role", self.role)
1✔
210
        kwargs.setdefault("Code", {"ZipFile": load_file(self.zip_file_path, mode="rb")})
1✔
211

212
        def _create_function():
1✔
213
            return self.lambda_client.create_function(**kwargs)
1✔
214

215
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
216
        # localstack should normally not require the retries and will just continue here
217
        result = retry(_create_function, retries=3, sleep=4)
1✔
218
        self.function_names.append(result["FunctionArn"])
1✔
219
        self.lambda_client.get_waiter("function_active_v2").wait(
1✔
220
            FunctionName=kwargs.get("FunctionName")
221
        )
222

223
        return result
1✔
224

225
    def destroy(self):
1✔
226
        for function_name in self.function_names:
1✔
227
            try:
1✔
228
                self.lambda_client.delete_function(FunctionName=function_name)
1✔
229
            except Exception as e:
×
UNCOV
230
                LOG.debug("Error deleting function %s: %s", function_name, e)
×
231

232

233
def update_done(client, function_name):
1✔
234
    """wait fn for checking 'LastUpdateStatus' of lambda"""
235

236
    def _update_done():
×
UNCOV
237
        last_update_status = client.get_function_configuration(FunctionName=function_name)[
×
238
            "LastUpdateStatus"
239
        ]
240
        if last_update_status == "Failed":
×
UNCOV
241
            raise ShortCircuitWaitException(f"Lambda Config update failed: {last_update_status=}")
×
242
        else:
UNCOV
243
            return last_update_status == "Successful"
×
244

UNCOV
245
    return _update_done
×
246

247

248
def concurrency_update_done(client, function_name, qualifier):
1✔
249
    """wait fn for ProvisionedConcurrencyConfig 'Status'"""
250

251
    def _concurrency_update_done():
1✔
252
        status = client.get_provisioned_concurrency_config(
1✔
253
            FunctionName=function_name, Qualifier=qualifier
254
        )["Status"]
255
        if status == "FAILED":
1✔
UNCOV
256
            raise ShortCircuitWaitException(f"Concurrency update failed: {status=}")
×
257
        else:
258
            return status == "READY"
1✔
259

260
    return _concurrency_update_done
1✔
261

262

263
def get_invoke_init_type(client, function_name, qualifier) -> InitializationType:
1✔
264
    """check the environment in the lambda for AWS_LAMBDA_INITIALIZATION_TYPE indicating ondemand/provisioned"""
UNCOV
265
    invoke_result = client.invoke(FunctionName=function_name, Qualifier=qualifier)
×
266
    return json.load(invoke_result["Payload"])
×
267

268

269
lambda_role = {
1✔
270
    "Version": "2012-10-17",
271
    "Statement": [
272
        {
273
            "Effect": "Allow",
274
            "Principal": {"Service": "lambda.amazonaws.com"},
275
            "Action": "sts:AssumeRole",
276
        }
277
    ],
278
}
279
esm_lambda_permission = {
1✔
280
    "Version": "2012-10-17",
281
    "Statement": [
282
        {
283
            "Effect": "Allow",
284
            "Action": [
285
                "sqs:*",
286
                "sns:*",
287
                "dynamodb:DescribeStream",
288
                "dynamodb:GetRecords",
289
                "dynamodb:GetShardIterator",
290
                "dynamodb:ListStreams",
291
                "kinesis:DescribeStream",
292
                "kinesis:DescribeStreamSummary",
293
                "kinesis:GetRecords",
294
                "kinesis:GetShardIterator",
295
                "kinesis:ListShards",
296
                "kinesis:ListStreams",
297
                "kinesis:SubscribeToShard",
298
                "logs:CreateLogGroup",
299
                "logs:CreateLogStream",
300
                "logs:PutLogEvents",
301
                "s3:ListBucket",
302
                "s3:PutObject",
303
            ],
304
            "Resource": ["*"],
305
        }
306
    ],
307
}
308

309

310
def _await_event_source_mapping_state(lambda_client, uuid, state, retries=30):
1✔
311
    def assert_mapping_disabled():
1✔
312
        assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == state
1✔
313

314
    retry(assert_mapping_disabled, sleep_before=2, retries=retries)
1✔
315

316

317
def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):
1✔
318
    return _await_event_source_mapping_state(
1✔
319
        lambda_client=lambda_client, uuid=uuid, retries=retries, state="Enabled"
320
    )
321

322

323
def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):
1✔
324
    def assert_table_active():
1✔
325
        assert (
1✔
326
            dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"
327
        )
328

329
    retry(assert_table_active, retries=retries, sleep_before=2)
1✔
330

331

332
def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):
1✔
333
    def get_events():
1✔
334
        events = get_lambda_log_events(function_name, logs_client=logs_client)
1✔
335
        assert len(events) == expected_num_events
1✔
336
        return events
1✔
337

338
    return retry(get_events, retries=retries, sleep_before=5, sleep=5)
1✔
339

340

341
def is_docker_runtime_executor():
1✔
342
    return config.LAMBDA_RUNTIME_EXECUTOR in ["docker", ""]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc