• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17265699519

27 Aug 2025 11:28AM UTC coverage: 86.827% (-0.01%) from 86.837%
17265699519

push

github

web-flow
Fix SQS tests failing due to missing snapshot update after #12957 (#13062)

67057 of 77231 relevant lines covered (86.83%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/localstack-core/localstack/utils/testutil.py
1
import glob
1✔
2
import importlib
1✔
3
import io
1✔
4
import json
1✔
5
import os
1✔
6
import re
1✔
7
import shutil
1✔
8
import tempfile
1✔
9
import time
1✔
10
from typing import Any, Callable, Optional
1✔
11

12
from localstack.aws.api.lambda_ import Runtime
1✔
13
from localstack.aws.connect import connect_externally_to, connect_to
1✔
14
from localstack.testing.aws.util import is_aws_cloud
1✔
15
from localstack.utils.aws import arns
1✔
16
from localstack.utils.aws import resources as resource_utils
1✔
17
from localstack.utils.aws.request_context import mock_aws_request_headers
1✔
18
from localstack.utils.urls import localstack_host
1✔
19

20
try:
1✔
21
    from typing import Literal
1✔
22
except ImportError:
×
23
    from typing_extensions import Literal
×
24

25
import boto3
1✔
26
import requests
1✔
27

28
from localstack import config
1✔
29
from localstack.constants import (
1✔
30
    LOCALSTACK_ROOT_FOLDER,
31
    LOCALSTACK_VENV_FOLDER,
32
)
33
from localstack.services.lambda_.lambda_utils import (
1✔
34
    get_handler_file_from_name,
35
)
36
from localstack.testing.config import (
1✔
37
    TEST_AWS_ACCESS_KEY_ID,
38
    TEST_AWS_ACCOUNT_ID,
39
    TEST_AWS_REGION_NAME,
40
)
41
from localstack.utils.archives import create_zip_file_cli, create_zip_file_python
1✔
42
from localstack.utils.collections import ensure_list
1✔
43
from localstack.utils.files import (
1✔
44
    TMP_FILES,
45
    chmod_r,
46
    cp_r,
47
    is_empty_dir,
48
    load_file,
49
    mkdir,
50
    rm_rf,
51
    save_file,
52
)
53
from localstack.utils.platform import is_debian
1✔
54
from localstack.utils.strings import short_uid, to_str
1✔
55

56
ARCHIVE_DIR_PREFIX = "lambda.archive."
1✔
57
DEFAULT_GET_LOG_EVENTS_DELAY = 3
1✔
58
LAMBDA_DEFAULT_HANDLER = "handler.handler"
1✔
59
LAMBDA_DEFAULT_RUNTIME = Runtime.python3_12
1✔
60
LAMBDA_DEFAULT_STARTING_POSITION = "LATEST"
1✔
61
LAMBDA_TIMEOUT_SEC = 30
1✔
62
LAMBDA_ASSETS_BUCKET_NAME = "ls-test-lambda-assets-bucket"
1✔
63
LAMBDA_TEST_ROLE = "arn:aws:iam::{account_id}:role/lambda-test-role"
1✔
64
MAX_LAMBDA_ARCHIVE_UPLOAD_SIZE = 50_000_000
1✔
65

66

67
def is_local_test_mode():
1✔
68
    return config.is_local_test_mode()
×
69

70

71
def create_lambda_archive(
1✔
72
    script: str,
73
    get_content: bool = False,
74
    libs: list[str] = None,
75
    runtime: str = None,
76
    file_name: str = None,
77
    exclude_func: Callable[[str], bool] = None,
78
):
79
    """Utility method to create a Lambda function archive"""
80
    if libs is None:
1✔
81
        libs = []
1✔
82
    runtime = runtime or LAMBDA_DEFAULT_RUNTIME
1✔
83

84
    with tempfile.TemporaryDirectory(prefix=ARCHIVE_DIR_PREFIX) as tmp_dir:
1✔
85
        file_name = file_name or get_handler_file_from_name(LAMBDA_DEFAULT_HANDLER, runtime=runtime)
1✔
86
        script_file = os.path.join(tmp_dir, file_name)
1✔
87
        if os.path.sep in script_file:
1✔
88
            mkdir(os.path.dirname(script_file))
1✔
89
            # create __init__.py files along the path to allow Python imports
90
            path = file_name.split(os.path.sep)
1✔
91
            for i in range(1, len(path)):
1✔
92
                save_file(os.path.join(tmp_dir, *(path[:i] + ["__init__.py"])), "")
1✔
93
        save_file(script_file, script)
1✔
94
        chmod_r(script_file, 0o777)
1✔
95
        # copy libs
96
        for lib in libs:
1✔
97
            paths = [lib, f"{lib}.py"]
1✔
98
            try:
1✔
99
                module = importlib.import_module(lib)
1✔
100
                paths.append(module.__file__)
1✔
101
            except Exception:
×
102
                pass
×
103
            target_dir = tmp_dir
1✔
104
            root_folder = os.path.join(LOCALSTACK_VENV_FOLDER, "lib/python*/site-packages")
1✔
105
            if lib == "localstack":
1✔
106
                paths = ["localstack/*.py", "localstack/utils"]
×
107
                root_folder = LOCALSTACK_ROOT_FOLDER
×
108
                target_dir = os.path.join(tmp_dir, lib)
×
109
                mkdir(target_dir)
×
110
            for path in paths:
1✔
111
                file_to_copy = path if path.startswith("/") else os.path.join(root_folder, path)
1✔
112
                for file_path in glob.glob(file_to_copy):
1✔
113
                    name = os.path.join(target_dir, file_path.split(os.path.sep)[-1])
1✔
114
                    if os.path.isdir(file_path):
1✔
115
                        cp_r(file_path, name)
1✔
116
                    else:
117
                        shutil.copyfile(file_path, name)
1✔
118

119
        if exclude_func:
1✔
120
            for dirpath, folders, files in os.walk(tmp_dir):
×
121
                for name in list(folders) + list(files):
×
122
                    full_name = os.path.join(dirpath, name)
×
123
                    relative = os.path.relpath(full_name, start=tmp_dir)
×
124
                    if exclude_func(relative):
×
125
                        rm_rf(full_name)
×
126

127
        # create zip file
128
        result = create_zip_file(tmp_dir, get_content=get_content)
1✔
129
        return result
1✔
130

131

132
def create_zip_file(
1✔
133
    file_path: str,
134
    zip_file: str = None,
135
    get_content: bool = False,
136
    content_root: str = None,
137
    mode: Literal["r", "w", "x", "a"] = "w",
138
):
139
    """
140
    Creates a zipfile to the designated file_path.
141

142
    By default, a new zip file is created but the mode parameter can be used to append to an existing zip file
143
    """
144
    base_dir = file_path
1✔
145
    if not os.path.isdir(file_path):
1✔
146
        base_dir = tempfile.mkdtemp(prefix=ARCHIVE_DIR_PREFIX)
1✔
147
        shutil.copy(file_path, base_dir)
1✔
148
        TMP_FILES.append(base_dir)
1✔
149
    tmp_dir = tempfile.mkdtemp(prefix=ARCHIVE_DIR_PREFIX)
1✔
150
    full_zip_file = zip_file
1✔
151
    if not full_zip_file:
1✔
152
        zip_file_name = "archive.zip"
1✔
153
        full_zip_file = os.path.join(tmp_dir, zip_file_name)
1✔
154
    # special case where target folder is empty -> create empty zip file
155
    if is_empty_dir(base_dir):
1✔
156
        # see https://stackoverflow.com/questions/25195495/how-to-create-an-empty-zip-file#25195628
157
        content = (
1✔
158
            b"PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
159
        )
160
        if get_content:
1✔
161
            return content
1✔
162
        save_file(full_zip_file, content)
×
163
        return full_zip_file
×
164

165
    # TODO: using a different packaging method here also produces wildly different .zip package sizes
166
    if is_debian() and "PYTEST_CURRENT_TEST" not in os.environ:
1✔
167
        # todo: extend CLI with the new parameters
168
        create_zip_file_cli(source_path=file_path, base_dir=base_dir, zip_file=full_zip_file)
×
169
    else:
170
        create_zip_file_python(
1✔
171
            base_dir=base_dir, zip_file=full_zip_file, mode=mode, content_root=content_root
172
        )
173
    if not get_content:
1✔
174
        TMP_FILES.append(tmp_dir)
1✔
175
        return full_zip_file
1✔
176
    with open(full_zip_file, "rb") as file_obj:
1✔
177
        zip_file_content = file_obj.read()
1✔
178
    rm_rf(tmp_dir)
1✔
179
    return zip_file_content
1✔
180

181

182
# TODO: make the `client` parameter mandatory to enforce proper xaccount access
183
def create_lambda_function(
1✔
184
    func_name,
185
    zip_file=None,
186
    event_source_arn=None,
187
    handler_file=None,
188
    handler=None,
189
    starting_position=None,
190
    runtime=None,
191
    envvars=None,
192
    tags=None,
193
    libs=None,
194
    delete=False,
195
    layers=None,
196
    client=None,
197
    role=None,
198
    timeout=None,
199
    region_name=None,
200
    s3_client=None,
201
    **kwargs,
202
):
203
    """Utility method to create a new function via the Lambda API
204
    CAVEAT: Does NOT wait until the function is ready/active. The fixture create_lambda_function waits until ready.
205
    """
206
    if envvars is None:
1✔
207
        envvars = {}
1✔
208
    if tags is None:
1✔
209
        tags = {}
1✔
210
    if libs is None:
1✔
211
        libs = []
1✔
212

213
    starting_position = starting_position or LAMBDA_DEFAULT_STARTING_POSITION
1✔
214
    runtime = runtime or LAMBDA_DEFAULT_RUNTIME
1✔
215
    client = client or connect_to(region_name=region_name).lambda_
1✔
216

217
    # load zip file content if handler_file is specified
218
    if not zip_file and handler_file:
1✔
219
        file_content = load_file(handler_file) if os.path.exists(handler_file) else handler_file
1✔
220
        if libs or not handler:
1✔
221
            zip_file = create_lambda_archive(
1✔
222
                file_content,
223
                libs=libs,
224
                get_content=True,
225
                runtime=runtime or LAMBDA_DEFAULT_RUNTIME,
226
            )
227
        else:
228
            zip_file = create_zip_file(handler_file, get_content=True)
1✔
229

230
    handler = handler or LAMBDA_DEFAULT_HANDLER
1✔
231

232
    if delete:
1✔
233
        try:
×
234
            # Delete function if one already exists
235
            client.delete_function(FunctionName=func_name)
×
236
        except Exception:
×
237
            pass
×
238

239
    lambda_code = {"ZipFile": zip_file}
1✔
240
    if len(zip_file) > MAX_LAMBDA_ARCHIVE_UPLOAD_SIZE:
1✔
241
        s3 = s3_client or connect_externally_to().s3
×
242
        resource_utils.get_or_create_bucket(LAMBDA_ASSETS_BUCKET_NAME)
×
243
        asset_key = f"{short_uid()}.zip"
×
244
        s3.upload_fileobj(
×
245
            Fileobj=io.BytesIO(zip_file), Bucket=LAMBDA_ASSETS_BUCKET_NAME, Key=asset_key
246
        )
247
        lambda_code = {"S3Bucket": LAMBDA_ASSETS_BUCKET_NAME, "S3Key": asset_key}
×
248

249
    # create function
250
    additional_kwargs = kwargs
1✔
251
    kwargs = {
1✔
252
        "FunctionName": func_name,
253
        "Runtime": runtime,
254
        "Handler": handler,
255
        "Role": role or LAMBDA_TEST_ROLE.format(account_id=TEST_AWS_ACCOUNT_ID),
256
        "Code": lambda_code,
257
        "Timeout": timeout or LAMBDA_TIMEOUT_SEC,
258
        "Environment": {"Variables": envvars},
259
        "Tags": tags,
260
    }
261
    kwargs.update(additional_kwargs)
1✔
262
    if layers:
1✔
263
        kwargs["Layers"] = layers
1✔
264
    create_func_resp = client.create_function(**kwargs)
1✔
265

266
    resp = {
1✔
267
        "CreateFunctionResponse": create_func_resp,
268
        "CreateEventSourceMappingResponse": None,
269
    }
270

271
    # create event source mapping
272
    if event_source_arn:
1✔
273
        resp["CreateEventSourceMappingResponse"] = client.create_event_source_mapping(
×
274
            FunctionName=func_name,
275
            EventSourceArn=event_source_arn,
276
            StartingPosition=starting_position,
277
        )
278

279
    return resp
1✔
280

281

282
def connect_api_gateway_to_http_with_lambda_proxy(
1✔
283
    gateway_name,
284
    target_uri,
285
    stage_name=None,
286
    methods=None,
287
    path=None,
288
    auth_type=None,
289
    auth_creator_func=None,
290
    http_method=None,
291
    client=None,
292
    role_arn: str = None,
293
):
294
    if methods is None:
×
295
        methods = []
×
296
    if not methods:
×
297
        methods = ["GET", "POST", "DELETE"]
×
298
    if not path:
×
299
        path = "/"
×
300
    stage_name = stage_name or "test"
×
301
    resources = {}
×
302
    resource_path = path.lstrip("/")
×
303
    resources[resource_path] = []
×
304

305
    for method in methods:
×
306
        int_meth = http_method or method
×
307
        integration = {"type": "AWS_PROXY", "uri": target_uri, "httpMethod": int_meth}
×
308
        if role_arn:
×
309
            integration["credentials"] = role_arn
×
310
        resources[resource_path].append(
×
311
            {
312
                "httpMethod": method,
313
                "authorizationType": auth_type,
314
                "authorizerId": None,
315
                "integrationHttpMethod": "POST",
316
                "integrations": [integration],
317
            }
318
        )
319
    return resource_utils.create_api_gateway(
×
320
        name=gateway_name,
321
        resources=resources,
322
        stage_name=stage_name,
323
        auth_creator_func=auth_creator_func,
324
        client=client,
325
    )
326

327

328
def create_lambda_api_gateway_integration(
1✔
329
    gateway_name,
330
    func_name,
331
    handler_file,
332
    lambda_client,
333
    methods=None,
334
    path=None,
335
    runtime=None,
336
    stage_name=None,
337
    auth_type=None,
338
    auth_creator_func=None,
339
    role_arn: str = None,
340
):
341
    if methods is None:
×
342
        methods = []
×
343
    path = path or "/test"
×
344
    auth_type = auth_type or "REQUEST"
×
345
    stage_name = stage_name or "test"
×
346

347
    # create Lambda
348
    zip_file = create_lambda_archive(handler_file, get_content=True, runtime=runtime)
×
349
    func_arn = create_lambda_function(
×
350
        func_name=func_name, zip_file=zip_file, runtime=runtime, client=lambda_client
351
    )["CreateFunctionResponse"]["FunctionArn"]
352
    target_arn = arns.apigateway_invocations_arn(func_arn, TEST_AWS_REGION_NAME)
×
353

354
    # connect API GW to Lambda
355
    result = connect_api_gateway_to_http_with_lambda_proxy(
×
356
        gateway_name,
357
        target_arn,
358
        stage_name=stage_name,
359
        path=path,
360
        methods=methods,
361
        auth_type=auth_type,
362
        auth_creator_func=auth_creator_func,
363
        role_arn=role_arn,
364
    )
365
    return result
×
366

367

368
def assert_objects(asserts, all_objects):
1✔
369
    if type(asserts) is not list:
1✔
370
        asserts = [asserts]
1✔
371
    for obj in asserts:
1✔
372
        assert_object(obj, all_objects)
1✔
373

374

375
def assert_object(expected_object, all_objects):
1✔
376
    # for Python 3 compatibility
377
    dict_values = type({}.values())
1✔
378
    if isinstance(all_objects, dict_values):
1✔
379
        all_objects = list(all_objects)
1✔
380
    # wrap single item in an array
381
    if type(all_objects) is not list:
1✔
382
        all_objects = [all_objects]
×
383
    found = find_object(expected_object, all_objects)
1✔
384
    if not found:
1✔
385
        raise Exception(f"Expected object not found: {expected_object} in list {all_objects}")
1✔
386

387

388
def find_object(expected_object, object_list):
1✔
389
    for obj in object_list:
1✔
390
        if isinstance(obj, list):
1✔
391
            found = find_object(expected_object, obj)
×
392
            if found:
×
393
                return found
×
394

395
        all_ok = True
1✔
396
        if obj != expected_object:
1✔
397
            if not isinstance(expected_object, dict):
×
398
                all_ok = False
×
399
            else:
400
                for k, v in expected_object.items():
×
401
                    if not find_recursive(k, v, obj):
×
402
                        all_ok = False
×
403
                        break
×
404
        if all_ok:
1✔
405
            return obj
1✔
406
    return None
1✔
407

408

409
def find_recursive(key, value, obj):
1✔
410
    if isinstance(obj, dict):
×
411
        for k, v in obj.items():
×
412
            if k == key and v == value:
×
413
                return True
×
414
            if find_recursive(key, value, v):
×
415
                return True
×
416
    elif isinstance(obj, list):
×
417
        for o in obj:
×
418
            if find_recursive(key, value, o):
×
419
                return True
×
420
    else:
421
        return False
×
422

423

424
def list_all_s3_objects(s3_client):
1✔
425
    return map_all_s3_objects(s3_client=s3_client).values()
1✔
426

427

428
def delete_all_s3_objects(s3_client, buckets: str | list[str]):
1✔
429
    buckets = ensure_list(buckets)
1✔
430
    for bucket in buckets:
1✔
431
        keys = all_s3_object_keys(s3_client, bucket)
1✔
432
        deletes = [{"Key": key} for key in keys]
1✔
433
        if deletes:
1✔
434
            s3_client.delete_objects(Bucket=bucket, Delete={"Objects": deletes})
1✔
435

436

437
def download_s3_object(s3_client, bucket, path):
1✔
438
    body = s3_client.get_object(Bucket=bucket, Key=path)["Body"]
1✔
439
    result = body.read()
1✔
440
    try:
1✔
441
        result = to_str(result)
1✔
442
    except Exception:
×
443
        pass
×
444
    return result
1✔
445

446

447
def all_s3_object_keys(s3_client, bucket: str) -> list[str]:
1✔
448
    response = s3_client.list_objects_v2(Bucket=bucket)
1✔
449
    keys = [obj["Key"] for obj in response.get("Contents", [])]
1✔
450
    return keys
1✔
451

452

453
def map_all_s3_objects(
1✔
454
    s3_client, to_json: bool = True, buckets: str | list[str] = None
455
) -> dict[str, Any]:
456
    result = {}
1✔
457
    buckets = ensure_list(buckets)
1✔
458
    if not buckets:
1✔
459
        # get all buckets
460
        response = s3_client.list_buckets()
1✔
461
        buckets = [b["Name"] for b in response["Buckets"]]
1✔
462

463
    for bucket in buckets:
1✔
464
        response = s3_client.list_objects_v2(Bucket=bucket)
1✔
465
        objects = [obj["Key"] for obj in response.get("Contents", [])]
1✔
466
        for key in objects:
1✔
467
            value = download_s3_object(s3_client, bucket, key)
1✔
468
            try:
1✔
469
                if to_json:
1✔
470
                    value = json.loads(value)
1✔
471
                separator = "" if key.startswith("/") else "/"
1✔
472
                result[f"{bucket}{separator}{key}"] = value
1✔
473
            except Exception:
×
474
                # skip non-JSON or binary objects
475
                pass
×
476
    return result
1✔
477

478

479
def send_describe_dynamodb_ttl_request(table_name):
1✔
480
    return send_dynamodb_request("", "DescribeTimeToLive", json.dumps({"TableName": table_name}))
1✔
481

482

483
def send_update_dynamodb_ttl_request(table_name, ttl_status):
1✔
484
    return send_dynamodb_request(
1✔
485
        "",
486
        "UpdateTimeToLive",
487
        json.dumps(
488
            {
489
                "TableName": table_name,
490
                "TimeToLiveSpecification": {
491
                    "AttributeName": "ExpireItem",
492
                    "Enabled": ttl_status,
493
                },
494
            }
495
        ),
496
    )
497

498

499
def send_dynamodb_request(path, action, request_body):
1✔
500
    headers = {
1✔
501
        "Host": "dynamodb.amazonaws.com",
502
        "x-amz-target": f"DynamoDB_20120810.{action}",
503
        "Authorization": mock_aws_request_headers(
504
            "dynamodb", aws_access_key_id=TEST_AWS_ACCESS_KEY_ID, region_name=TEST_AWS_REGION_NAME
505
        )["Authorization"],
506
    }
507
    url = f"{config.internal_service_url()}/{path}"
1✔
508
    return requests.put(url, data=request_body, headers=headers, verify=False)
1✔
509

510

511
def get_lambda_log_group_name(function_name):
1✔
512
    return f"/aws/lambda/{function_name}"
1✔
513

514

515
# TODO: make logs_client mandatory
516
def check_expected_lambda_log_events_length(
1✔
517
    expected_length, function_name, regex_filter=None, logs_client=None
518
):
519
    events = get_lambda_log_events(
1✔
520
        function_name, regex_filter=regex_filter, logs_client=logs_client
521
    )
522
    events = [line for line in events if line not in ["\x1b[0m", "\\x1b[0m"]]
1✔
523
    if len(events) != expected_length:
1✔
524
        print(
1✔
525
            "Invalid # of Lambda {} log events: {} / {}: {}".format(
526
                function_name,
527
                len(events),
528
                expected_length,
529
                [
530
                    event if len(event) < 1000 else f"{event[:1000]}... (truncated)"
531
                    for event in events
532
                ],
533
            )
534
        )
535
    assert len(events) == expected_length
1✔
536
    return events
1✔
537

538

539
def list_all_log_events(log_group_name: str, logs_client=None) -> list[dict]:
1✔
540
    logs = logs_client or connect_to().logs
1✔
541
    return list_all_resources(
1✔
542
        lambda kwargs: logs.filter_log_events(logGroupName=log_group_name, **kwargs),
543
        last_token_attr_name="nextToken",
544
        list_attr_name="events",
545
    )
546

547

548
def get_lambda_log_events(
1✔
549
    function_name,
550
    delay_time=DEFAULT_GET_LOG_EVENTS_DELAY,
551
    regex_filter: Optional[str] = None,
552
    log_group=None,
553
    logs_client=None,
554
):
555
    def get_log_events(func_name, delay):
1✔
556
        time.sleep(delay)
1✔
557
        log_group_name = log_group or get_lambda_log_group_name(func_name)
1✔
558
        return list_all_log_events(log_group_name, logs_client)
1✔
559

560
    try:
1✔
561
        events = get_log_events(function_name, delay_time)
1✔
562
    except Exception as e:
1✔
563
        if "ResourceNotFoundException" in str(e):
1✔
564
            return []
1✔
565
        raise
×
566

567
    rs = []
1✔
568
    for event in events:
1✔
569
        raw_message = event["message"]
1✔
570
        if (
1✔
571
            not raw_message
572
            or raw_message.startswith("INIT_START")
573
            or raw_message.startswith("START")
574
            or raw_message.startswith("END")
575
            or raw_message.startswith(
576
                "REPORT"
577
            )  # necessary until tail is updated in docker images. See this PR:
578
            # http://git.savannah.gnu.org/gitweb/?p=coreutils.git;a=commitdiff;h=v8.24-111-g1118f32
579
            or "tail: unrecognized file system type" in raw_message
580
            or regex_filter
581
            and not re.search(regex_filter, raw_message)
582
        ):
583
            continue
1✔
584
        if raw_message in ["\x1b[0m", "\\x1b[0m"]:
1✔
585
            continue
×
586

587
        try:
1✔
588
            rs.append(json.loads(raw_message))
1✔
589
        except Exception:
1✔
590
            rs.append(raw_message)
1✔
591

592
    return rs
1✔
593

594

595
def list_all_resources(
1✔
596
    page_function: Callable[[dict], Any],
597
    last_token_attr_name: str,
598
    list_attr_name: str,
599
    next_token_attr_name: Optional[str] = None,
600
) -> list:
601
    """
602
    List all available resources by loading all available pages using `page_function`.
603

604
    :type page_function: Callable
605
    :param page_function: callable function or lambda that accepts kwargs with next token
606
                          and returns the next results page
607

608
    :type last_token_attr_name: str
609
    :param last_token_attr_name: where to look for the last evaluated token
610

611
    :type list_attr_name: str
612
    :param list_attr_name: where to look for the list of items
613

614
    :type next_token_attr_name: Optional[str]
615
    :param next_token_attr_name: name of kwarg with the next token, default is the same as `last_token_attr_name`
616

617
    Example usage:
618

619
        all_log_groups = list_all_resources(
620
            lambda kwargs: logs.describe_log_groups(**kwargs),
621
            last_token_attr_name="nextToken",
622
            list_attr_name="logGroups"
623
        )
624

625
        all_records = list_all_resources(
626
            lambda kwargs: dynamodb.scan(**{**kwargs, **dynamodb_kwargs}),
627
            last_token_attr_name="LastEvaluatedKey",
628
            next_token_attr_name="ExclusiveStartKey",
629
            list_attr_name="Items"
630
        )
631
    """
632

633
    if next_token_attr_name is None:
1✔
634
        next_token_attr_name = last_token_attr_name
1✔
635

636
    result = None
1✔
637
    collected_items = []
1✔
638
    last_evaluated_token = None
1✔
639

640
    while not result or last_evaluated_token:
1✔
641
        kwargs = {next_token_attr_name: last_evaluated_token} if last_evaluated_token else {}
1✔
642
        result = page_function(kwargs)
1✔
643
        last_evaluated_token = result.get(last_token_attr_name)
1✔
644
        collected_items += result.get(list_attr_name, [])
1✔
645

646
    return collected_items
1✔
647

648

649
def response_arn_matches_partition(client, response_arn: str) -> bool:
1✔
650
    parsed_arn = arns.parse_arn(response_arn)
1✔
651
    return (
1✔
652
        client.meta.partition
653
        == boto3.session.Session().get_partition_for_region(parsed_arn["region"])
654
        and client.meta.partition == parsed_arn["partition"]
655
    )
656

657

658
def upload_file_to_bucket(s3_client, bucket_name, file_path, file_name=None):
1✔
659
    key = file_name or f"file-{short_uid()}"
1✔
660

661
    s3_client.upload_file(
1✔
662
        file_path,
663
        Bucket=bucket_name,
664
        Key=key,
665
    )
666

667
    domain = "amazonaws.com" if is_aws_cloud() else localstack_host().host_and_port()
1✔
668
    url = f"https://{bucket_name}.s3.{domain}/{key}"
1✔
669

670
    return {"Bucket": bucket_name, "Key": key, "Url": url}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc