• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19821277742

01 Dec 2025 08:16AM UTC coverage: 86.821% (-0.04%) from 86.863%
19821277742

push

github

web-flow
Add Lambda Managed Instances (#13440)

Co-authored-by: Joel Scheuner <joel.scheuner.dev@gmail.com>
Co-authored-by: Anisa Oshafi <anisaoshafi@gmail.com>
Co-authored-by: Cristopher Pinzón <cristopher.pinzon@gmail.com>
Co-authored-by: Alexander Rashed <alexander.rashed@localstack.cloud>
Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com>
Co-authored-by: Mathieu Cloutier <79954947+cloutierMat@users.noreply.github.com>
Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

127 of 181 new or added lines in 11 files covered. (70.17%)

17 existing lines in 5 files now uncovered.

69556 of 80114 relevant lines covered (86.82%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.16
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from collections.abc import Callable
1✔
10
from typing import TYPE_CHECKING, Any, Unpack
1✔
11

12
import botocore.auth
1✔
13
import botocore.config
1✔
14
import botocore.credentials
1✔
15
import botocore.session
1✔
16
import pytest
1✔
17
from _pytest.config import Config
1✔
18
from _pytest.nodes import Item
1✔
19
from botocore.exceptions import ClientError
1✔
20
from botocore.regions import EndpointResolver
1✔
21
from pytest_httpserver import HTTPServer
1✔
22
from werkzeug import Request, Response
1✔
23

24
from localstack import config
1✔
25
from localstack.aws.api.cloudformation import CreateChangeSetInput, Parameter
1✔
26
from localstack.aws.api.ec2 import CreateSecurityGroupRequest, CreateVpcEndpointRequest, VpcEndpoint
1✔
27
from localstack.aws.connect import ServiceLevelClientFactory
1✔
28
from localstack.services.stores import (
1✔
29
    AccountRegionBundle,
30
    BaseStore,
31
    CrossAccountAttribute,
32
    CrossRegionAttribute,
33
    LocalAttribute,
34
)
35
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
36
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
37
from localstack.testing.config import (
1✔
38
    SECONDARY_TEST_AWS_ACCOUNT_ID,
39
    SECONDARY_TEST_AWS_REGION_NAME,
40
    TEST_AWS_ACCOUNT_ID,
41
    TEST_AWS_REGION_NAME,
42
)
43
from localstack.utils import testutil
1✔
44
from localstack.utils.aws.arns import get_partition
1✔
45
from localstack.utils.aws.client import SigningHttpClient
1✔
46
from localstack.utils.aws.resources import create_dynamodb_table
1✔
47
from localstack.utils.bootstrap import is_api_enabled
1✔
48
from localstack.utils.collections import ensure_list, select_from_typed_dict
1✔
49
from localstack.utils.functions import call_safe, run_safe
1✔
50
from localstack.utils.http import safe_requests as requests
1✔
51
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
52
from localstack.utils.json import CustomEncoder, json_safe
1✔
53
from localstack.utils.net import wait_for_port_open
1✔
54
from localstack.utils.strings import short_uid, to_str
1✔
55
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
56

57
LOG = logging.getLogger(__name__)
1✔
58

59
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
60
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
61

62
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
63
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
64
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
65
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
66

67

68
if TYPE_CHECKING:
1✔
69
    from mypy_boto3_sqs import SQSClient
×
70
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
71

72

73
@pytest.fixture(scope="session")
1✔
74
def aws_client_no_retry(aws_client_factory):
1✔
75
    """
76
    This fixture can be used to obtain Boto clients with disabled retries for testing.
77
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
78

79
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
80
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
81

82
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
83
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
84
    General socket/connection errors:
85
    * ConnectionError
86
    * ConnectionClosedError
87
    * ReadTimeoutError
88
    * EndpointConnectionError
89

90
    Service-side throttling/limit errors and exceptions:
91
    * Throttling
92
    * ThrottlingException
93
    * ThrottledException
94
    * RequestThrottledException
95
    * ProvisionedThroughputExceededException
96

97
    HTTP status codes: 429, 500, 502, 503, 504, and 509
98

99
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
100
    """
101
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
102
    return aws_client_factory(config=no_retry_config)
1✔
103

104

105
@pytest.fixture(scope="class")
1✔
106
def aws_http_client_factory(aws_session):
1✔
107
    """
108
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
109
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
110
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
111
    LocalStack.
112

113
    Example invocations
114

115
        client = aws_signing_http_client_factory("sqs")
116
        client.get("http://localhost:4566/000000000000/my-queue")
117

118
    or
119
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
120
        client.post("...")
121
    """
122

123
    def factory(
1✔
124
        service: str,
125
        region: str = None,
126
        signer_factory: Callable[
127
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
128
        ] = botocore.auth.SigV4QueryAuth,
129
        endpoint_url: str = None,
130
        aws_access_key_id: str = None,
131
        aws_secret_access_key: str = None,
132
    ):
133
        region = region or TEST_AWS_REGION_NAME
1✔
134

135
        if aws_access_key_id or aws_secret_access_key:
1✔
136
            credentials = botocore.credentials.Credentials(
1✔
137
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
138
            )
139
        else:
140
            credentials = aws_session.get_credentials()
1✔
141

142
        creds = credentials.get_frozen_credentials()
1✔
143

144
        if not endpoint_url:
1✔
145
            if is_aws_cloud():
1✔
146
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
147
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
148
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
149
            else:
150
                endpoint_url = config.internal_service_url()
1✔
151

152
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
153

154
    return factory
1✔
155

156

157
@pytest.fixture(scope="class")
1✔
158
def s3_vhost_client(aws_client_factory, region_name):
1✔
159
    return aws_client_factory(
1✔
160
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
161
    ).s3
162

163

164
@pytest.fixture
1✔
165
def dynamodb_wait_for_table_active(aws_client):
1✔
166
    def wait_for_table_active(table_name: str, client=None):
1✔
167
        def wait():
1✔
168
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
169
                "TableStatus"
170
            ] == "ACTIVE"
171

172
        poll_condition(wait, timeout=30)
1✔
173

174
    return wait_for_table_active
1✔
175

176

177
@pytest.fixture
1✔
178
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
179
    tables = []
1✔
180

181
    def factory(**kwargs):
1✔
182
        if "TableName" not in kwargs:
1✔
183
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
184

185
        tables.append(kwargs["TableName"])
1✔
186
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
187
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
188
        return response
1✔
189

190
    yield factory
1✔
191

192
    # cleanup
193
    for table in tables:
1✔
194
        try:
1✔
195
            # table has to be in ACTIVE state before deletion
196
            dynamodb_wait_for_table_active(table)
1✔
197
            aws_client.dynamodb.delete_table(TableName=table)
1✔
198
        except Exception as e:
1✔
199
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
200

201

202
@pytest.fixture
1✔
203
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
204
    # beware, this swallows exception in create_dynamodb_table utility function
205
    tables = []
1✔
206

207
    def factory(**kwargs):
1✔
208
        kwargs["client"] = aws_client.dynamodb
1✔
209
        if "table_name" not in kwargs:
1✔
210
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
211
        if "partition_key" not in kwargs:
1✔
212
            kwargs["partition_key"] = "id"
1✔
213

214
        tables.append(kwargs["table_name"])
1✔
215

216
        return create_dynamodb_table(**kwargs)
1✔
217

218
    yield factory
1✔
219

220
    # cleanup
221
    for table in tables:
1✔
222
        try:
1✔
223
            # table has to be in ACTIVE state before deletion
224
            dynamodb_wait_for_table_active(table)
1✔
225
            aws_client.dynamodb.delete_table(TableName=table)
1✔
226
        except Exception as e:
1✔
227
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
228

229

230
@pytest.fixture
1✔
231
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
232
    buckets = []
1✔
233

234
    def factory(**kwargs) -> str:
1✔
235
        if "Bucket" not in kwargs:
1✔
236
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
1✔
237

238
        if (
1✔
239
            "CreateBucketConfiguration" not in kwargs
240
            and aws_client.s3.meta.region_name != "us-east-1"
241
        ):
242
            kwargs["CreateBucketConfiguration"] = {
×
243
                "LocationConstraint": aws_client.s3.meta.region_name
244
            }
245

246
        aws_client.s3.create_bucket(**kwargs)
1✔
247
        buckets.append(kwargs["Bucket"])
1✔
248
        return kwargs["Bucket"]
1✔
249

250
    yield factory
1✔
251

252
    # cleanup
253
    for bucket in buckets:
1✔
254
        try:
1✔
255
            s3_empty_bucket(bucket)
1✔
256
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
257
        except Exception as e:
1✔
258
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
259

260

261
@pytest.fixture
1✔
262
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
263
    buckets = []
1✔
264

265
    def factory(s3_client, **kwargs) -> str:
1✔
266
        if "Bucket" not in kwargs:
1✔
267
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
268

269
        response = s3_client.create_bucket(**kwargs)
1✔
270
        buckets.append(kwargs["Bucket"])
1✔
271
        return response
1✔
272

273
    yield factory
1✔
274

275
    # cleanup
276
    for bucket in buckets:
1✔
277
        try:
1✔
278
            s3_empty_bucket(bucket)
1✔
279
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
280
        except Exception as e:
1✔
281
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
282

283

284
@pytest.fixture
1✔
285
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
286
    region = aws_client.s3.meta.region_name
1✔
287
    kwargs = {}
1✔
288
    if region != "us-east-1":
1✔
289
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
290
    return s3_create_bucket(**kwargs)
1✔
291

292

293
@pytest.fixture
1✔
294
def s3_empty_bucket(aws_client):
1✔
295
    """
296
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
297
    """
298

299
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
300
    # FIXME: this won't work when bucket has more than 1000 objects
301
    def factory(bucket_name: str):
1✔
302
        kwargs = {}
1✔
303
        try:
1✔
304
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
305
            kwargs["BypassGovernanceRetention"] = True
1✔
306
        except ClientError:
1✔
307
            pass
1✔
308

309
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
310
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
311
        if objects:
1✔
312
            aws_client.s3.delete_objects(
1✔
313
                Bucket=bucket_name,
314
                Delete={"Objects": objects},
315
                **kwargs,
316
            )
317

318
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
319
        versions = response.get("Versions", [])
1✔
320
        versions.extend(response.get("DeleteMarkers", []))
1✔
321

322
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
323
        if object_versions:
1✔
324
            aws_client.s3.delete_objects(
1✔
325
                Bucket=bucket_name,
326
                Delete={"Objects": object_versions},
327
                **kwargs,
328
            )
329

330
    yield factory
1✔
331

332

333
@pytest.fixture
1✔
334
def sqs_create_queue(aws_client):
1✔
335
    queue_urls = []
1✔
336

337
    def factory(**kwargs):
1✔
338
        if "QueueName" not in kwargs:
1✔
339
            kwargs["QueueName"] = f"test-queue-{short_uid()}"
1✔
340

341
        response = aws_client.sqs.create_queue(**kwargs)
1✔
342
        url = response["QueueUrl"]
1✔
343
        queue_urls.append(url)
1✔
344

345
        return url
1✔
346

347
    yield factory
1✔
348

349
    # cleanup
350
    for queue_url in queue_urls:
1✔
351
        try:
1✔
352
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
353
        except Exception as e:
1✔
354
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
355

356

357
@pytest.fixture
1✔
358
def sqs_receive_messages_delete(aws_client):
1✔
359
    def factory(
1✔
360
        queue_url: str,
361
        expected_messages: int | None = None,
362
        wait_time: int | None = 5,
363
    ):
364
        response = aws_client.sqs.receive_message(
1✔
365
            QueueUrl=queue_url,
366
            MessageAttributeNames=["All"],
367
            VisibilityTimeout=0,
368
            WaitTimeSeconds=wait_time,
369
        )
370
        messages = []
1✔
371
        for m in response["Messages"]:
1✔
372
            message = json.loads(to_str(m["Body"]))
1✔
373
            messages.append(message)
1✔
374

375
        if expected_messages is not None:
1✔
376
            assert len(messages) == expected_messages
×
377

378
        for message in response["Messages"]:
1✔
379
            aws_client.sqs.delete_message(
1✔
380
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
381
            )
382

383
        return messages
1✔
384

385
    return factory
1✔
386

387

388
@pytest.fixture
1✔
389
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
390
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
391
        all_messages = []
1✔
392
        for _ in range(max_iterations):
1✔
393
            try:
1✔
394
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
395
            except KeyError:
×
396
                # there were no messages
397
                continue
×
398
            all_messages.extend(messages)
1✔
399

400
            if len(all_messages) >= expected_messages:
1✔
401
                return all_messages[:expected_messages]
1✔
402

403
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
404

405
    return factory
1✔
406

407

408
@pytest.fixture
1✔
409
def sqs_collect_messages(aws_client):
1✔
410
    """Collects SQS messages from a given queue_url and deletes them by default.
411
    Example usage:
412
    messages = sqs_collect_messages(
413
         my_queue_url,
414
         expected=2,
415
         timeout=10,
416
         attribute_names=["All"],
417
         message_attribute_names=["All"],
418
    )
419
    """
420

421
    def factory(
1✔
422
        queue_url: str,
423
        expected: int,
424
        timeout: int,
425
        delete: bool = True,
426
        attribute_names: list[str] = None,
427
        message_attribute_names: list[str] = None,
428
        max_number_of_messages: int = 1,
429
        wait_time_seconds: int = 5,
430
        sqs_client: "SQSClient | None" = None,
431
    ) -> list["MessageTypeDef"]:
432
        sqs_client = sqs_client or aws_client.sqs
1✔
433
        collected = []
1✔
434

435
        def _receive():
1✔
436
            response = sqs_client.receive_message(
1✔
437
                QueueUrl=queue_url,
438
                # Maximum is 20 seconds. Performs long polling.
439
                WaitTimeSeconds=wait_time_seconds,
440
                # Maximum 10 messages
441
                MaxNumberOfMessages=max_number_of_messages,
442
                AttributeNames=attribute_names or [],
443
                MessageAttributeNames=message_attribute_names or [],
444
            )
445

446
            if messages := response.get("Messages"):
1✔
447
                collected.extend(messages)
1✔
448

449
                if delete:
1✔
450
                    for m in messages:
1✔
451
                        sqs_client.delete_message(
1✔
452
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
453
                        )
454

455
            return len(collected) >= expected
1✔
456

457
        if not poll_condition(_receive, timeout=timeout):
1✔
458
            raise TimeoutError(
×
459
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
460
            )
461

462
        return collected
1✔
463

464
    yield factory
1✔
465

466

467
@pytest.fixture
1✔
468
def sqs_queue(sqs_create_queue):
1✔
469
    return sqs_create_queue()
1✔
470

471

472
@pytest.fixture
1✔
473
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
474
    def _get_queue_arn(queue_url: str) -> str:
1✔
475
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
476
            "Attributes"
477
        ]["QueueArn"]
478

479
    return _get_queue_arn
1✔
480

481

482
@pytest.fixture
1✔
483
def sqs_queue_exists(aws_client):
1✔
484
    def _queue_exists(queue_url: str) -> bool:
1✔
485
        """
486
        Checks whether a queue with the given queue URL exists.
487
        :param queue_url: the queue URL
488
        :return: true if the queue exists, false otherwise
489
        """
490
        try:
1✔
491
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
492
            return result.get("QueueUrl") == queue_url
×
493
        except ClientError as e:
1✔
494
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
495
                return False
1✔
496
            raise
×
497

498
    yield _queue_exists
1✔
499

500

501
@pytest.fixture
1✔
502
def sns_create_topic(aws_client):
1✔
503
    topic_arns = []
1✔
504

505
    def _create_topic(**kwargs):
1✔
506
        if "Name" not in kwargs:
1✔
507
            kwargs["Name"] = f"test-topic-{short_uid()}"
1✔
508
        response = aws_client.sns.create_topic(**kwargs)
1✔
509
        topic_arns.append(response["TopicArn"])
1✔
510
        return response
1✔
511

512
    yield _create_topic
1✔
513

514
    for topic_arn in topic_arns:
1✔
515
        try:
1✔
516
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
517
        except Exception as e:
×
518
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
519

520

521
@pytest.fixture
1✔
522
def sns_wait_for_topic_delete(aws_client):
1✔
523
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
524
        def wait():
1✔
525
            try:
1✔
526
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
527
                return False
×
528
            except Exception as e:
1✔
529
                if "NotFound" in e.response["Error"]["Code"]:
1✔
530
                    return True
1✔
531

532
                raise
×
533

534
        poll_condition(wait, timeout=30)
1✔
535

536
    return wait_for_topic_delete
1✔
537

538

539
@pytest.fixture
1✔
540
def sns_subscription(aws_client):
1✔
541
    sub_arns = []
1✔
542

543
    def _create_sub(**kwargs):
1✔
544
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
545
            kwargs["ReturnSubscriptionArn"] = True
1✔
546

547
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
548
        response = aws_client.sns.subscribe(**kwargs)
1✔
549
        sub_arn = response["SubscriptionArn"]
1✔
550
        sub_arns.append(sub_arn)
1✔
551
        return response
1✔
552

553
    yield _create_sub
1✔
554

555
    for sub_arn in sub_arns:
1✔
556
        try:
1✔
557
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
558
        except Exception as e:
1✔
559
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
560

561

562
@pytest.fixture
1✔
563
def sns_topic(sns_create_topic, aws_client):
1✔
564
    topic_arn = sns_create_topic()["TopicArn"]
1✔
565
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
566

567

568
@pytest.fixture
1✔
569
def sns_allow_topic_sqs_queue(aws_client):
1✔
570
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
571
        # allow topic to write to sqs queue
572
        aws_client.sqs.set_queue_attributes(
1✔
573
            QueueUrl=sqs_queue_url,
574
            Attributes={
575
                "Policy": json.dumps(
576
                    {
577
                        "Statement": [
578
                            {
579
                                "Effect": "Allow",
580
                                "Principal": {"Service": "sns.amazonaws.com"},
581
                                "Action": "sqs:SendMessage",
582
                                "Resource": sqs_queue_arn,
583
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
584
                            }
585
                        ]
586
                    }
587
                )
588
            },
589
        )
590

591
    return _allow_sns_topic
1✔
592

593

594
@pytest.fixture
1✔
595
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
596
    subscriptions = []
1✔
597

598
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> dict[str, str]:
1✔
599
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
600

601
        # connect sns topic to sqs
602
        subscription = aws_client.sns.subscribe(
1✔
603
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
604
        )
605
        subscription_arn = subscription["SubscriptionArn"]
1✔
606

607
        # allow topic to write to sqs queue
608
        sns_allow_topic_sqs_queue(
1✔
609
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
610
        )
611

612
        subscriptions.append(subscription_arn)
1✔
613
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
614
            "Attributes"
615
        ]
616

617
    yield _factory
1✔
618

619
    for arn in subscriptions:
1✔
620
        try:
1✔
621
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
622
        except Exception as e:
×
623
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
624

625

626
@pytest.fixture
1✔
627
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
628
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
629
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
630
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
631
    http_servers = []
1✔
632

633
    def _create_http_endpoint(
1✔
634
        raw_message_delivery: bool = False,
635
    ) -> tuple[str, str, str, HTTPServer]:
636
        server = HTTPServer()
1✔
637
        server.start()
1✔
638
        http_servers.append(server)
1✔
639
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
640
        endpoint_url = server.url_for("/sns-endpoint")
1✔
641
        wait_for_port_open(endpoint_url)
1✔
642

643
        topic_arn = sns_create_topic()["TopicArn"]
1✔
644
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
645
        subscription_arn = subscription["SubscriptionArn"]
1✔
646
        delivery_policy = {
1✔
647
            "healthyRetryPolicy": {
648
                "minDelayTarget": 1,
649
                "maxDelayTarget": 1,
650
                "numRetries": 0,
651
                "numNoDelayRetries": 0,
652
                "numMinDelayRetries": 0,
653
                "numMaxDelayRetries": 0,
654
                "backoffFunction": "linear",
655
            },
656
            "sicklyRetryPolicy": None,
657
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
658
            "guaranteed": False,
659
        }
660
        aws_client.sns.set_subscription_attributes(
1✔
661
            SubscriptionArn=subscription_arn,
662
            AttributeName="DeliveryPolicy",
663
            AttributeValue=json.dumps(delivery_policy),
664
        )
665

666
        if raw_message_delivery:
1✔
667
            aws_client.sns.set_subscription_attributes(
1✔
668
                SubscriptionArn=subscription_arn,
669
                AttributeName="RawMessageDelivery",
670
                AttributeValue="true",
671
            )
672

673
        return topic_arn, subscription_arn, endpoint_url, server
1✔
674

675
    yield _create_http_endpoint
1✔
676

677
    for http_server in http_servers:
1✔
678
        if http_server.is_running():
1✔
679
            http_server.stop()
1✔
680

681

682
@pytest.fixture
1✔
683
def route53_hosted_zone(aws_client):
1✔
684
    hosted_zones = []
1✔
685

686
    def factory(**kwargs):
1✔
687
        if "Name" not in kwargs:
1✔
688
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
689
        if "CallerReference" not in kwargs:
1✔
690
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
691
        response = aws_client.route53.create_hosted_zone(
1✔
692
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
693
        )
694
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
695
        return response
1✔
696

697
    yield factory
1✔
698

699
    for zone in hosted_zones:
1✔
700
        try:
1✔
701
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
702
        except Exception as e:
1✔
703
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
704

705

706
@pytest.fixture
1✔
707
def transcribe_create_job(s3_bucket, aws_client):
1✔
708
    job_names = []
1✔
709

710
    def _create_job(audio_file: str, params: dict[str, Any] | None = None) -> str:
1✔
711
        s3_key = "test-clip.wav"
1✔
712

713
        if not params:
1✔
714
            params = {}
1✔
715

716
        if "TranscriptionJobName" not in params:
1✔
717
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
718

719
        if "LanguageCode" not in params:
1✔
720
            params["LanguageCode"] = "en-GB"
1✔
721

722
        if "Media" not in params:
1✔
723
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
724

725
        # upload test wav to a s3 bucket
726
        with open(audio_file, "rb") as f:
1✔
727
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
728

729
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
730

731
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
732
        job_names.append(job_name)
1✔
733

734
        return job_name
1✔
735

736
    yield _create_job
1✔
737

738
    for job_name in job_names:
1✔
739
        with contextlib.suppress(ClientError):
1✔
740
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
741

742

743
@pytest.fixture
1✔
744
def kinesis_create_stream(aws_client):
1✔
745
    stream_names = []
1✔
746

747
    def _create_stream(**kwargs):
1✔
748
        if "StreamName" not in kwargs:
1✔
749
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
750
        aws_client.kinesis.create_stream(**kwargs)
1✔
751
        stream_names.append(kwargs["StreamName"])
1✔
752
        return kwargs["StreamName"]
1✔
753

754
    yield _create_stream
1✔
755

756
    for stream_name in stream_names:
1✔
757
        try:
1✔
758
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
759
        except Exception as e:
1✔
760
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
1✔
761

762

763
@pytest.fixture
1✔
764
def wait_for_stream_ready(aws_client):
1✔
765
    def _wait_for_stream_ready(stream_name: str):
1✔
766
        def is_stream_ready():
1✔
767
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
768
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
769
                "ACTIVE",
770
                "UPDATING",
771
            ]
772

773
        return poll_condition(is_stream_ready)
1✔
774

775
    return _wait_for_stream_ready
1✔
776

777

778
@pytest.fixture
1✔
779
def wait_for_delivery_stream_ready(aws_client):
1✔
780
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
781
        def is_stream_ready():
1✔
782
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
783
                DeliveryStreamName=delivery_stream_name
784
            )
785
            return (
1✔
786
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
787
                == "ACTIVE"
788
            )
789

790
        poll_condition(is_stream_ready)
1✔
791

792
    return _wait_for_stream_ready
1✔
793

794

795
@pytest.fixture
1✔
796
def wait_for_dynamodb_stream_ready(aws_client):
1✔
797
    def _wait_for_stream_ready(stream_arn: str, client=None):
1✔
798
        def is_stream_ready():
1✔
799
            ddb_client = client or aws_client.dynamodbstreams
1✔
800
            describe_stream_response = ddb_client.describe_stream(StreamArn=stream_arn)
1✔
801
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
802

803
        return poll_condition(is_stream_ready)
1✔
804

805
    return _wait_for_stream_ready
1✔
806

807

808
@pytest.fixture()
1✔
809
def kms_create_key(aws_client_factory):
1✔
810
    key_ids = []
1✔
811

812
    def _create_key(region_name: str = None, **kwargs):
1✔
813
        if "Description" not in kwargs:
1✔
814
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
815
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
816
            "KeyMetadata"
817
        ]
818
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
819
        return key_metadata
1✔
820

821
    yield _create_key
1✔
822

823
    for region_name, key_id in key_ids:
1✔
824
        try:
1✔
825
            # shortest amount of time you can schedule the deletion
826
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
827
                KeyId=key_id, PendingWindowInDays=7
828
            )
829
        except Exception as e:
1✔
830
            exception_message = str(e)
1✔
831
            # Some tests schedule their keys for deletion themselves.
832
            if (
1✔
833
                "KMSInvalidStateException" not in exception_message
834
                or "is pending deletion" not in exception_message
835
            ):
836
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
837

838

839
@pytest.fixture()
1✔
840
def kms_replicate_key(aws_client_factory):
1✔
841
    key_ids = []
1✔
842

843
    def _replicate_key(region_from=None, **kwargs):
1✔
844
        region_to = kwargs.get("ReplicaRegion")
1✔
845
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
846
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
847

848
    yield _replicate_key
1✔
849

850
    for region_to, key_id in key_ids:
1✔
851
        try:
1✔
852
            # shortest amount of time you can schedule the deletion
853
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
854
                KeyId=key_id, PendingWindowInDays=7
855
            )
856
        except Exception as e:
×
857
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
858

859

860
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
861
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
862
# to make sure that we clean up aliases before keys get cleaned up.
863
@pytest.fixture()
1✔
864
def kms_create_alias(kms_create_key, aws_client):
1✔
865
    aliases = []
1✔
866

867
    def _create_alias(**kwargs):
1✔
868
        if "AliasName" not in kwargs:
1✔
869
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
870
        if "TargetKeyId" not in kwargs:
1✔
871
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
872

873
        aws_client.kms.create_alias(**kwargs)
1✔
874
        aliases.append(kwargs["AliasName"])
1✔
875
        return kwargs["AliasName"]
1✔
876

877
    yield _create_alias
1✔
878

879
    for alias in aliases:
1✔
880
        try:
1✔
881
            aws_client.kms.delete_alias(AliasName=alias)
1✔
882
        except Exception as e:
1✔
883
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
884

885

886
@pytest.fixture()
1✔
887
def kms_create_grant(kms_create_key, aws_client):
1✔
888
    grants = []
1✔
889

890
    def _create_grant(**kwargs):
1✔
891
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
892
        # but some GranteePrincipal is required to create a grant.
893
        GRANTEE_PRINCIPAL_ARN = (
1✔
894
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
895
        )
896

897
        if "Operations" not in kwargs:
1✔
898
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
899
        if "GranteePrincipal" not in kwargs:
1✔
900
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
901
        if "KeyId" not in kwargs:
1✔
902
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
903

904
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
905
        grants.append((grant_id, kwargs["KeyId"]))
1✔
906
        return grant_id, kwargs["KeyId"]
1✔
907

908
    yield _create_grant
1✔
909

910
    for grant_id, key_id in grants:
1✔
911
        try:
1✔
912
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
913
        except Exception as e:
×
914
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
915

916

917
@pytest.fixture
1✔
918
def kms_key(kms_create_key):
1✔
919
    return kms_create_key()
1✔
920

921

922
@pytest.fixture
1✔
923
def kms_grant_and_key(kms_key, aws_client):
1✔
924
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
925

926
    return [
1✔
927
        aws_client.kms.create_grant(
928
            KeyId=kms_key["KeyId"],
929
            GranteePrincipal=user_arn,
930
            Operations=["Decrypt", "Encrypt"],
931
        ),
932
        kms_key,
933
    ]
934

935

936
@pytest.fixture
1✔
937
def opensearch_wait_for_cluster(aws_client):
1✔
938
    def _wait_for_cluster(domain_name: str):
1✔
939
        def finished_processing():
1✔
940
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
941
            return status["DomainProcessingStatus"] == "Active" and "Endpoint" in status
1✔
942

943
        assert poll_condition(
1✔
944
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
945
        ), f"could not start domain: {domain_name}"
946

947
    return _wait_for_cluster
1✔
948

949

950
@pytest.fixture
1✔
951
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
952
    domains = []
1✔
953

954
    def factory(**kwargs) -> str:
1✔
955
        if "DomainName" not in kwargs:
1✔
956
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
957

958
        aws_client.opensearch.create_domain(**kwargs)
1✔
959

960
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
961

962
        domains.append(kwargs["DomainName"])
1✔
963
        return kwargs["DomainName"]
1✔
964

965
    yield factory
1✔
966

967
    # cleanup
968
    for domain in domains:
1✔
969
        try:
1✔
970
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
971
        except Exception as e:
×
972
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
973

974

975
@pytest.fixture
1✔
976
def opensearch_domain(opensearch_create_domain) -> str:
1✔
977
    return opensearch_create_domain()
1✔
978

979

980
@pytest.fixture
1✔
981
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
982
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
983
    assert "Endpoint" in status
1✔
984
    return f"https://{status['Endpoint']}"
1✔
985

986

987
@pytest.fixture
1✔
988
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
989
    document = {
1✔
990
        "first_name": "Boba",
991
        "last_name": "Fett",
992
        "age": 41,
993
        "about": "I'm just a simple man, trying to make my way in the universe.",
994
        "interests": ["mandalorian armor", "tusken culture"],
995
    }
996
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
997
    response = requests.put(
1✔
998
        document_path,
999
        data=json.dumps(document),
1000
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
1001
    )
1002
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1003
    return document_path
1✔
1004

1005

1006
# Cleanup fixtures
1007
@pytest.fixture
1✔
1008
def cleanup_stacks(aws_client):
1✔
1009
    def _cleanup_stacks(stacks: list[str]) -> None:
1✔
1010
        stacks = ensure_list(stacks)
1✔
1011
        for stack in stacks:
1✔
1012
            try:
1✔
1013
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1014
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1015
            except Exception:
×
1016
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1017

1018
    return _cleanup_stacks
1✔
1019

1020

1021
@pytest.fixture
1✔
1022
def cleanup_changesets(aws_client):
1✔
1023
    def _cleanup_changesets(changesets: list[str]) -> None:
1✔
1024
        changesets = ensure_list(changesets)
1✔
1025
        for cs in changesets:
1✔
1026
            try:
1✔
1027
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1028
            except Exception:
1✔
1029
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1030

1031
    return _cleanup_changesets
1✔
1032

1033

1034
# Helpers for Cfn
1035

1036

1037
# TODO: exports(!)
1038
@dataclasses.dataclass(frozen=True)
1✔
1039
class DeployResult:
1✔
1040
    change_set_id: str
1✔
1041
    stack_id: str
1✔
1042
    stack_name: str
1✔
1043
    change_set_name: str
1✔
1044
    outputs: dict[str, str]
1✔
1045

1046
    destroy: Callable[[], None]
1✔
1047

1048

1049
class StackDeployError(Exception):
1✔
1050
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1051
        self.describe_result = describe_res
1✔
1052
        self.events = events
1✔
1053

1054
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1055
        if config.CFN_VERBOSE_ERRORS:
1✔
1056
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1057
        else:
1058
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1059

1060
        super().__init__(msg)
1✔
1061

1062
    def format_events(self, events: list[dict]) -> str:
1✔
1063
        formatted_events = []
1✔
1064

1065
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1066
        for event in chronological_events:
1✔
1067
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1068
                formatted_events.append(self.format_event(event))
1✔
1069

1070
        return "\n".join(formatted_events)
1✔
1071

1072
    @staticmethod
1✔
1073
    def format_event(event: dict) -> str:
1✔
1074
        if reason := event.get("ResourceStatusReason"):
1✔
1075
            reason = reason.replace("\n", "; ")
1✔
1076
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1077
        else:
1078
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
1✔
1079

1080

1081
@pytest.fixture
1✔
1082
def deploy_cfn_template(
1✔
1083
    aws_client: ServiceLevelClientFactory,
1084
):
1085
    state: list[tuple[str, Callable]] = []
1✔
1086

1087
    def _deploy(
1✔
1088
        *,
1089
        is_update: bool | None = False,
1090
        stack_name: str | None = None,
1091
        change_set_name: str | None = None,
1092
        template: str | None = None,
1093
        template_path: str | os.PathLike | None = None,
1094
        template_mapping: dict[str, Any] | None = None,
1095
        parameters: dict[str, str] | None = None,
1096
        role_arn: str | None = None,
1097
        max_wait: int | None = None,
1098
        delay_between_polls: int | None = 2,
1099
        custom_aws_client: ServiceLevelClientFactory | None = None,
1100
        raw_parameters: list[Parameter] | None = None,
1101
    ) -> DeployResult:
1102
        if is_update:
1✔
1103
            assert stack_name
1✔
1104
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1105
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1106

1107
        if max_wait is None:
1✔
1108
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1109

1110
        if template_path is not None:
1✔
1111
            template = load_template_file(template_path)
1✔
1112
            if template is None:
1✔
1113
                raise RuntimeError(f"Could not find file {os.path.realpath(template_path)}")
×
1114
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1115

1116
        kwargs = CreateChangeSetInput(
1✔
1117
            StackName=stack_name,
1118
            ChangeSetName=change_set_name,
1119
            TemplateBody=template_rendered,
1120
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1121
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1122
        )
1123
        kwargs["Parameters"] = []
1✔
1124
        if parameters:
1✔
1125
            kwargs["Parameters"] = [
1✔
1126
                Parameter(ParameterKey=k, ParameterValue=v) for (k, v) in parameters.items()
1127
            ]
1128
        elif raw_parameters:
1✔
1129
            kwargs["Parameters"] = raw_parameters
×
1130

1131
        if role_arn is not None:
1✔
1132
            kwargs["RoleARN"] = role_arn
×
1133

1134
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1135

1136
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1137

1138
        change_set_id = response["Id"]
1✔
1139
        stack_id = response["StackId"]
1✔
1140

1141
        try:
1✔
1142
            cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1143
                ChangeSetName=change_set_id
1144
            )
1145
        except botocore.exceptions.WaiterError as e:
×
1146
            change_set = cfn_aws_client.cloudformation.describe_change_set(
×
1147
                ChangeSetName=change_set_id
1148
            )
1149
            raise Exception(f"{change_set['Status']}: {change_set.get('StatusReason')}") from e
×
1150

1151
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1152
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1153
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1154
        )
1155

1156
        try:
1✔
1157
            stack_waiter.wait(
1✔
1158
                StackName=stack_id,
1159
                WaiterConfig={
1160
                    "Delay": delay_between_polls,
1161
                    "MaxAttempts": max_wait / delay_between_polls,
1162
                },
1163
            )
1164
        except botocore.exceptions.WaiterError as e:
1✔
1165
            raise StackDeployError(
1✔
1166
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1167
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1168
                    "StackEvents"
1169
                ],
1170
            ) from e
1171

1172
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1173
            "Stacks"
1174
        ][0]
1175
        outputs = describe_stack_res.get("Outputs", [])
1✔
1176

1177
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1178

1179
        def _destroy_stack():
1✔
1180
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1181
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1182
                StackName=stack_id,
1183
                WaiterConfig={
1184
                    "Delay": delay_between_polls,
1185
                    "MaxAttempts": max_wait / delay_between_polls,
1186
                },
1187
            )
1188

1189
        state.append((stack_id, _destroy_stack))
1✔
1190

1191
        return DeployResult(
1✔
1192
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1193
        )
1194

1195
    yield _deploy
1✔
1196

1197
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1198
    for stack_id, teardown in state[::-1]:
1✔
1199
        try:
1✔
1200
            teardown()
1✔
1201
        except Exception as e:
1✔
1202
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1203

1204

1205
@pytest.fixture
1✔
1206
def is_change_set_created_and_available(aws_client):
1✔
1207
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1208
        def _inner():
1✔
1209
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1210
            return (
1✔
1211
                # TODO: CREATE_FAILED should also not lead to further retries
1212
                change_set.get("Status") == "CREATE_COMPLETE"
1213
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1214
            )
1215

1216
        return _inner
1✔
1217

1218
    return _is_change_set_created_and_available
1✔
1219

1220

1221
@pytest.fixture
1✔
1222
def is_change_set_failed_and_unavailable(aws_client):
1✔
1223
    def _is_change_set_created_and_available(change_set_id: str):
×
1224
        def _inner():
×
1225
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1226
            return (
×
1227
                # TODO: CREATE_FAILED should also not lead to further retries
1228
                change_set.get("Status") == "FAILED"
1229
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1230
            )
1231

1232
        return _inner
×
1233

1234
    return _is_change_set_created_and_available
×
1235

1236

1237
@pytest.fixture
1✔
1238
def is_stack_created(aws_client):
1✔
1239
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1240

1241

1242
@pytest.fixture
1✔
1243
def is_stack_updated(aws_client):
1✔
1244
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1245

1246

1247
@pytest.fixture
1✔
1248
def is_stack_deleted(aws_client):
1✔
1249
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1250

1251

1252
def _has_stack_status(cfn_client, statuses: list[str]):
1✔
1253
    def _has_status(stack_id: str):
1✔
1254
        def _inner():
1✔
1255
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1256
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1257
            return s.get("StackStatus") in statuses
1✔
1258

1259
        return _inner
1✔
1260

1261
    return _has_status
1✔
1262

1263

1264
@pytest.fixture
1✔
1265
def is_change_set_finished(aws_client):
1✔
1266
    def _is_change_set_finished(change_set_id: str, stack_name: str | None = None):
1✔
1267
        def _inner():
×
1268
            kwargs = {"ChangeSetName": change_set_id}
×
1269
            if stack_name:
×
1270
                kwargs["StackName"] = stack_name
×
1271

1272
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
×
1273

1274
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
×
1275
                LOG.warning("Change set failed")
×
1276
                raise ShortCircuitWaitException()
×
1277

1278
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
×
1279

1280
        return _inner
×
1281

1282
    return _is_change_set_finished
1✔
1283

1284

1285
@pytest.fixture
1✔
1286
def wait_until_lambda_ready(aws_client):
1✔
1287
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1288
        client = client or aws_client.lambda_
1✔
1289

1290
        def _is_not_pending():
1✔
1291
            kwargs = {}
1✔
1292
            if qualifier:
1✔
1293
                kwargs["Qualifier"] = qualifier
×
1294
            try:
1✔
1295
                result = (
1✔
1296
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1297
                    != "Pending"
1298
                )
1299
                LOG.debug("lambda state result: result=%s", result)
1✔
1300
                return result
1✔
1301
            except Exception as e:
×
1302
                LOG.error(e)
×
1303
                raise
×
1304

1305
        wait_until(_is_not_pending)
1✔
1306

1307
    return _wait_until_ready
1✔
1308

1309

1310
role_assume_policy = """
1✔
1311
{
1312
  "Version": "2012-10-17",
1313
  "Statement": [
1314
    {
1315
      "Effect": "Allow",
1316
      "Principal": {
1317
        "Service": "lambda.amazonaws.com"
1318
      },
1319
      "Action": "sts:AssumeRole"
1320
    }
1321
  ]
1322
}
1323
""".strip()
1324

1325
role_policy = """
1✔
1326
{
1327
    "Version": "2012-10-17",
1328
    "Statement": [
1329
        {
1330
            "Effect": "Allow",
1331
            "Action": [
1332
                "logs:CreateLogGroup",
1333
                "logs:CreateLogStream",
1334
                "logs:PutLogEvents"
1335
            ],
1336
            "Resource": [
1337
                "*"
1338
            ]
1339
        }
1340
    ]
1341
}
1342
""".strip()
1343

1344

1345
@pytest.fixture
1✔
1346
def create_lambda_function_aws(aws_client):
1✔
1347
    lambda_arns = []
1✔
1348

1349
    def _create_lambda_function(**kwargs):
1✔
1350
        def _create_function():
1✔
1351
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1352
            lambda_arns.append(resp["FunctionArn"])
1✔
1353

1354
            def _is_not_pending():
1✔
1355
                try:
1✔
1356
                    result = (
1✔
1357
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1358
                            "Configuration"
1359
                        ]["State"]
1360
                        != "Pending"
1361
                    )
1362
                    return result
1✔
1363
                except Exception as e:
×
1364
                    LOG.error(e)
×
1365
                    raise
×
1366

1367
            wait_until(_is_not_pending)
1✔
1368
            return resp
1✔
1369

1370
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1371
        # localstack should normally not require the retries and will just continue here
1372
        return retry(_create_function, retries=3, sleep=4)
1✔
1373

1374
    yield _create_lambda_function
1✔
1375

1376
    for arn in lambda_arns:
1✔
1377
        try:
1✔
1378
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1379
        except Exception:
×
1380
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1381

1382

1383
@pytest.fixture
1✔
1384
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1385
    lambda_arns_and_clients = []
1✔
1386
    log_groups = []
1✔
1387
    lambda_client = aws_client.lambda_
1✔
1388
    logs_client = aws_client.logs
1✔
1389
    s3_client = aws_client.s3
1✔
1390

1391
    def _create_lambda_function(*args, **kwargs):
1✔
1392
        client = kwargs.get("client") or lambda_client
1✔
1393
        kwargs["client"] = client
1✔
1394
        kwargs["s3_client"] = s3_client
1✔
1395
        func_name = kwargs.get("func_name")
1✔
1396
        assert func_name
1✔
1397
        del kwargs["func_name"]
1✔
1398

1399
        if not kwargs.get("role"):
1✔
1400
            kwargs["role"] = lambda_su_role
1✔
1401

1402
        def _create_function():
1✔
1403
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1404
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1405
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1406
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1407
            log_groups.append(log_group_name)
1✔
1408
            return resp
1✔
1409

1410
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1411
        # localstack should normally not require the retries and will just continue here
1412
        return retry(_create_function, retries=3, sleep=4)
1✔
1413

1414
    yield _create_lambda_function
1✔
1415

1416
    for arn, client in lambda_arns_and_clients:
1✔
1417
        try:
1✔
1418
            client.delete_function(FunctionName=arn)
1✔
1419
        except Exception:
1✔
1420
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1421

1422
    for log_group_name in log_groups:
1✔
1423
        try:
1✔
1424
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1425
        except Exception:
1✔
1426
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1427

1428

1429
@pytest.fixture
1✔
1430
def lambda_is_function_deleted(aws_client):
1✔
1431
    """Example usage:
1432
    wait_until(lambda_is_function_deleted(function_name))
1433
    wait_until(lambda_is_function_deleted(function_name, Qualifier="my-alias"))
1434

1435
    function_name can be a function name, function ARN, or partial function ARN.
1436
    """
NEW
1437
    return _lambda_is_function_deleted(aws_client.lambda_)
×
1438

1439

1440
def _lambda_is_function_deleted(lambda_client):
1✔
NEW
1441
    def _is_function_deleted(
×
1442
        function_name: str,
1443
        **kwargs,
1444
    ) -> Callable[[], bool]:
NEW
1445
        def _inner() -> bool:
×
NEW
1446
            try:
×
NEW
1447
                lambda_client.get_function(FunctionName=function_name, **kwargs)
×
NEW
1448
                return False
×
NEW
1449
            except lambda_client.exceptions.ResourceNotFoundException:
×
NEW
1450
                return True
×
1451

NEW
1452
        return _inner
×
1453

NEW
1454
    return _is_function_deleted
×
1455

1456

1457
@pytest.fixture
1✔
1458
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1459
    from localstack.aws.api.lambda_ import Runtime
1✔
1460

1461
    lambda_client = aws_client.lambda_
1✔
1462
    handler_code = textwrap.dedent(
1✔
1463
        """
1464
    import json
1465
    import os
1466

1467

1468
    def make_response(body: dict, status_code: int = 200):
1469
        return {
1470
            "statusCode": status_code,
1471
            "headers": {"Content-Type": "application/json"},
1472
            "body": body,
1473
        }
1474

1475

1476
    def trim_headers(headers):
1477
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1478
            return headers
1479
        return {
1480
            key: value for key, value in headers.items()
1481
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1482
        }
1483

1484

1485
    def handler(event, context):
1486
        print(json.dumps(event))
1487
        response = {
1488
            "args": event.get("queryStringParameters", {}),
1489
            "data": event.get("body", ""),
1490
            "domain": event["requestContext"].get("domainName", ""),
1491
            "headers": trim_headers(event.get("headers", {})),
1492
            "method": event["requestContext"]["http"].get("method", ""),
1493
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1494
            "path": event["requestContext"]["http"].get("path", ""),
1495
        }
1496
        return make_response(response)"""
1497
    )
1498

1499
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1500
        """Creates a server that will echo any request. Any request will be returned with the
1501
        following format. Any unset values will have those defaults.
1502
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1503
        order to create easier Snapshot testing. Default: `False`
1504
        {
1505
          "args": {},
1506
          "headers": {},
1507
          "data": "",
1508
          "method": "",
1509
          "domain": "",
1510
          "origin": "",
1511
          "path": ""
1512
        }"""
1513
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1514
        func_name = f"echo-http-{short_uid()}"
1✔
1515
        create_lambda_function(
1✔
1516
            func_name=func_name,
1517
            zip_file=zip_file,
1518
            runtime=Runtime.python3_12,
1519
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1520
        )
1521
        url_response = lambda_client.create_function_url_config(
1✔
1522
            FunctionName=func_name, AuthType="NONE"
1523
        )
1524
        aws_client.lambda_.add_permission(
1✔
1525
            FunctionName=func_name,
1526
            StatementId="urlPermission",
1527
            Action="lambda:InvokeFunctionUrl",
1528
            Principal="*",
1529
            FunctionUrlAuthType="NONE",
1530
        )
1531
        return url_response["FunctionUrl"]
1✔
1532

1533
    yield _create_echo_http_server
1✔
1534

1535

1536
@pytest.fixture
1✔
1537
def create_event_source_mapping(aws_client):
1✔
1538
    uuids = []
1✔
1539

1540
    def _create_event_source_mapping(*args, **kwargs):
1✔
1541
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1542
        uuids.append(response["UUID"])
1✔
1543
        return response
1✔
1544

1545
    yield _create_event_source_mapping
1✔
1546

1547
    for uuid in uuids:
1✔
1548
        try:
1✔
1549
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1550
        except Exception:
×
1551
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1552

1553

1554
@pytest.fixture
1✔
1555
def check_lambda_logs(aws_client):
1✔
1556
    def _check_logs(func_name: str, expected_lines: list[str] = None) -> list[str]:
1✔
1557
        if not expected_lines:
1✔
1558
            expected_lines = []
×
1559
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1560
        log_messages = [e["message"] for e in log_events]
1✔
1561
        for line in expected_lines:
1✔
1562
            if ".*" in line:
1✔
1563
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1564
                if any(found):
1✔
1565
                    continue
1✔
1566
            assert line in log_messages
×
1567
        return log_messages
1✔
1568

1569
    return _check_logs
1✔
1570

1571

1572
@pytest.fixture
1✔
1573
def create_policy(aws_client):
1✔
1574
    policy_arns = []
1✔
1575

1576
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1577
        iam_client = iam_client or aws_client.iam
1✔
1578
        if "PolicyName" not in kwargs:
1✔
1579
            kwargs["PolicyName"] = f"policy-{short_uid()}"
1✔
1580
        response = iam_client.create_policy(*args, **kwargs)
1✔
1581
        policy_arn = response["Policy"]["Arn"]
1✔
1582
        policy_arns.append((policy_arn, iam_client))
1✔
1583
        return response
1✔
1584

1585
    yield _create_policy
1✔
1586

1587
    for policy_arn, iam_client in policy_arns:
1✔
1588
        try:
1✔
1589
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1590
        except Exception:
1✔
1591
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1592

1593

1594
@pytest.fixture
1✔
1595
def create_user(aws_client):
1✔
1596
    usernames = []
1✔
1597

1598
    def _create_user(**kwargs):
1✔
1599
        if "UserName" not in kwargs:
1✔
1600
            kwargs["UserName"] = f"user-{short_uid()}"
×
1601
        response = aws_client.iam.create_user(**kwargs)
1✔
1602
        usernames.append(response["User"]["UserName"])
1✔
1603
        return response
1✔
1604

1605
    yield _create_user
1✔
1606

1607
    for username in usernames:
1✔
1608
        try:
1✔
1609
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1610
        except ClientError as e:
1✔
1611
            LOG.debug(
1✔
1612
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1613
            )
1614
            continue
1✔
1615

1616
        for inline_policy in inline_policies:
1✔
1617
            try:
1✔
1618
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1619
            except Exception:
×
1620
                LOG.debug(
×
1621
                    "Could not delete user policy '%s' from '%s' during cleanup",
1622
                    inline_policy,
1623
                    username,
1624
                )
1625
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1626
            "AttachedPolicies"
1627
        ]
1628
        for attached_policy in attached_policies:
1✔
1629
            try:
1✔
1630
                aws_client.iam.detach_user_policy(
1✔
1631
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1632
                )
1633
            except Exception:
1✔
1634
                LOG.debug(
1✔
1635
                    "Error detaching policy '%s' from user '%s'",
1636
                    attached_policy["PolicyArn"],
1637
                    username,
1638
                )
1639
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1640
        for access_key in access_keys:
1✔
1641
            try:
1✔
1642
                aws_client.iam.delete_access_key(
1✔
1643
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1644
                )
1645
            except Exception:
×
1646
                LOG.debug(
×
1647
                    "Error deleting access key '%s' from user '%s'",
1648
                    access_key["AccessKeyId"],
1649
                    username,
1650
                )
1651

1652
        try:
1✔
1653
            aws_client.iam.delete_user(UserName=username)
1✔
1654
        except Exception as e:
1✔
1655
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1656

1657

1658
@pytest.fixture
1✔
1659
def wait_and_assume_role(aws_client):
1✔
1660
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1661
        if not session_name:
1✔
1662
            session_name = f"session-{short_uid()}"
1✔
1663

1664
        def assume_role():
1✔
1665
            return aws_client.sts.assume_role(
1✔
1666
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1667
            )["Credentials"]
1668

1669
        # need to retry a couple of times before we are allowed to assume this role in AWS
1670
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1671
        return keys
1✔
1672

1673
    return _wait_and_assume_role
1✔
1674

1675

1676
@pytest.fixture
1✔
1677
def create_role(aws_client):
1✔
1678
    role_names = []
1✔
1679

1680
    def _create_role(iam_client=None, **kwargs):
1✔
1681
        if not kwargs.get("RoleName"):
1✔
1682
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1683
        iam_client = iam_client or aws_client.iam
1✔
1684
        result = iam_client.create_role(**kwargs)
1✔
1685
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1686
        return result
1✔
1687

1688
    yield _create_role
1✔
1689

1690
    for role_name, iam_client in role_names:
1✔
1691
        # detach policies
1692
        try:
1✔
1693
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1694
                "AttachedPolicies"
1695
            ]
1696
        except ClientError as e:
1✔
1697
            LOG.debug(
1✔
1698
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1699
                e,
1700
                role_name,
1701
            )
1702
            continue
1✔
1703
        for attached_policy in attached_policies:
1✔
1704
            try:
1✔
1705
                iam_client.detach_role_policy(
1✔
1706
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1707
                )
1708
            except Exception:
×
1709
                LOG.debug(
×
1710
                    "Could not detach role policy '%s' from '%s' during cleanup",
1711
                    attached_policy["PolicyArn"],
1712
                    role_name,
1713
                )
1714
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1715
        for role_policy in role_policies:
1✔
1716
            try:
1✔
1717
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1718
            except Exception:
×
1719
                LOG.debug(
×
1720
                    "Could not delete role policy '%s' from '%s' during cleanup",
1721
                    role_policy,
1722
                    role_name,
1723
                )
1724
        try:
1✔
1725
            iam_client.delete_role(RoleName=role_name)
1✔
1726
        except Exception:
×
1727
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1728

1729

1730
@pytest.fixture
1✔
1731
def create_parameter(aws_client):
1✔
1732
    params = []
1✔
1733

1734
    def _create_parameter(**kwargs):
1✔
1735
        params.append(kwargs["Name"])
1✔
1736
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1737

1738
    yield _create_parameter
1✔
1739

1740
    for param in params:
1✔
1741
        aws_client.ssm.delete_parameter(Name=param)
1✔
1742

1743

1744
@pytest.fixture
1✔
1745
def create_secret(aws_client):
1✔
1746
    items = []
1✔
1747

1748
    def _create_parameter(**kwargs):
1✔
1749
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1750
        items.append(create_response["ARN"])
1✔
1751
        return create_response
1✔
1752

1753
    yield _create_parameter
1✔
1754

1755
    for item in items:
1✔
1756
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1757

1758

1759
# TODO Figure out how to make cert creation tests pass against AWS.
1760
#
1761
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1762
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1763
#
1764
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1765
# by adding some CNAME records as requested by ASW in response to a certificate request.
1766
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1767
#
1768
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1769
# created by openssl etc. Not sure if there are other issues after that.
1770
#
1771
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1772
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1773
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1774
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1775
# pre-created certificates we would have to use specific domain names instead of random ones.
1776
@pytest.fixture
1✔
1777
def acm_request_certificate(aws_client_factory):
1✔
1778
    certificate_arns = []
1✔
1779

1780
    def factory(**kwargs) -> str:
1✔
1781
        if "DomainName" not in kwargs:
1✔
1782
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1783

1784
        region_name = kwargs.pop("region_name", None)
1✔
1785
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1786

1787
        response = acm_client.request_certificate(**kwargs)
1✔
1788
        created_certificate_arn = response["CertificateArn"]
1✔
1789
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1790
        return response
1✔
1791

1792
    yield factory
1✔
1793

1794
    # cleanup
1795
    for certificate_arn, region_name in certificate_arns:
1✔
1796
        try:
1✔
1797
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1798
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1799
        except Exception as e:
×
1800
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1801

1802

1803
role_policy_su = {
1✔
1804
    "Version": "2012-10-17",
1805
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1806
}
1807

1808

1809
@pytest.fixture(scope="session")
1✔
1810
def lambda_su_role(aws_client):
1✔
1811
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1812
    role = aws_client.iam.create_role(
1✔
1813
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1814
    )["Role"]
1815
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1816
    policy_arn = aws_client.iam.create_policy(
1✔
1817
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1818
    )["Policy"]["Arn"]
1819
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1820

1821
    if is_aws_cloud():  # dirty but necessary
1✔
1822
        time.sleep(10)
×
1823

1824
    yield role["Arn"]
1✔
1825

1826
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1827
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1828
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1829

1830

1831
@pytest.fixture
1✔
1832
def create_iam_role_and_attach_policy(aws_client):
1✔
1833
    """
1834
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1835

1836
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1837
    """
1838
    roles = []
×
1839

1840
    def _inner(**kwargs: dict[str, any]) -> str:
×
1841
        """
1842
        :param dict RoleDefinition: role definition document
1843
        :param str PolicyArn: policy ARN
1844
        :param str RoleName: role name (autogenerated if omitted)
1845
        :return: role ARN
1846
        """
1847
        if "RoleName" not in kwargs:
×
1848
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1849

1850
        role = kwargs["RoleName"]
×
1851
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1852

1853
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1854
        role_arn = result["Role"]["Arn"]
×
1855

1856
        policy_arn = kwargs["PolicyArn"]
×
1857
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1858

1859
        roles.append(role)
×
1860
        return role_arn
×
1861

1862
    yield _inner
×
1863

1864
    for role in roles:
×
1865
        try:
×
1866
            aws_client.iam.delete_role(RoleName=role)
×
1867
        except Exception as exc:
×
1868
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1869

1870

1871
@pytest.fixture
1✔
1872
def create_iam_role_with_policy(aws_client):
1✔
1873
    """
1874
    Fixture that creates an IAM role with given role definition and policy definition.
1875
    """
1876
    roles = {}
1✔
1877

1878
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1879
        """
1880
        :param dict RoleDefinition: role definition document
1881
        :param dict PolicyDefinition: policy definition document
1882
        :param str PolicyName: policy name (autogenerated if omitted)
1883
        :param str RoleName: role name (autogenerated if omitted)
1884
        :return: role ARN
1885
        """
1886
        if "RoleName" not in kwargs:
1✔
1887
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1888
        role = kwargs["RoleName"]
1✔
1889
        if "PolicyName" not in kwargs:
1✔
1890
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1891
        policy = kwargs["PolicyName"]
1✔
1892
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1893

1894
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1895
        role_arn = result["Role"]["Arn"]
1✔
1896

1897
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1898
        aws_client.iam.put_role_policy(
1✔
1899
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1900
        )
1901
        roles[role] = policy
1✔
1902
        return role_arn
1✔
1903

1904
    yield _create_role_and_policy
1✔
1905

1906
    for role_name, policy_name in roles.items():
1✔
1907
        try:
1✔
1908
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1909
        except Exception as exc:
×
1910
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1911
        try:
1✔
1912
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1913
        except Exception as exc:
×
1914
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1915

1916

1917
@pytest.fixture
1✔
1918
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1919
    delivery_stream_names = []
1✔
1920

1921
    def _create_delivery_stream(**kwargs):
1✔
1922
        if "DeliveryStreamName" not in kwargs:
1✔
1923
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1924
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1925
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1926
        delivery_stream_names.append(delivery_stream_name)
1✔
1927
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1928
        return response
1✔
1929

1930
    yield _create_delivery_stream
1✔
1931

1932
    for delivery_stream_name in delivery_stream_names:
1✔
1933
        try:
1✔
1934
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1935
        except Exception:
×
1936
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1937

1938

1939
@pytest.fixture
1✔
1940
def ses_configuration_set(aws_client):
1✔
1941
    configuration_set_names = []
1✔
1942

1943
    def factory(name: str) -> None:
1✔
1944
        aws_client.ses.create_configuration_set(
1✔
1945
            ConfigurationSet={
1946
                "Name": name,
1947
            },
1948
        )
1949
        configuration_set_names.append(name)
1✔
1950

1951
    yield factory
1✔
1952

1953
    for configuration_set_name in configuration_set_names:
1✔
1954
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1955

1956

1957
@pytest.fixture
1✔
1958
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1959
    event_destinations = []
1✔
1960

1961
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1962
        aws_client.ses.create_configuration_set_event_destination(
1✔
1963
            ConfigurationSetName=config_set_name,
1964
            EventDestination={
1965
                "Name": event_destination_name,
1966
                "Enabled": True,
1967
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1968
                "SNSDestination": {
1969
                    "TopicARN": topic_arn,
1970
                },
1971
            },
1972
        )
1973
        event_destinations.append((config_set_name, event_destination_name))
1✔
1974

1975
    yield factory
1✔
1976

1977
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1978
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1979
            ConfigurationSetName=created_config_set_name,
1980
            EventDestinationName=created_event_destination_name,
1981
        )
1982

1983

1984
@pytest.fixture
1✔
1985
def ses_email_template(aws_client):
1✔
1986
    template_names = []
1✔
1987

1988
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1989
        aws_client.ses.create_template(
1✔
1990
            Template={
1991
                "TemplateName": name,
1992
                "SubjectPart": subject,
1993
                "TextPart": contents,
1994
            }
1995
        )
1996
        template_names.append(name)
1✔
1997

1998
    yield factory
1✔
1999

2000
    for template_name in template_names:
1✔
2001
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
2002

2003

2004
@pytest.fixture
1✔
2005
def ses_verify_identity(aws_client):
1✔
2006
    identities = []
1✔
2007

2008
    def factory(email_address: str) -> None:
1✔
2009
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
2010

2011
    yield factory
1✔
2012

2013
    for identity in identities:
1✔
2014
        aws_client.ses.delete_identity(Identity=identity)
×
2015

2016

2017
@pytest.fixture
1✔
2018
def setup_sender_email_address(ses_verify_identity):
1✔
2019
    """
2020
    If the test is running against AWS then assume the email address passed is already
2021
    verified, and passes the given email address through. Otherwise, it generates one random
2022
    email address and verify them.
2023
    """
2024

2025
    def inner(sender_email_address: str | None = None) -> str:
1✔
2026
        if is_aws_cloud():
1✔
2027
            if sender_email_address is None:
×
2028
                raise ValueError(
×
2029
                    "sender_email_address must be specified to run this test against AWS"
2030
                )
2031
        else:
2032
            # overwrite the given parameters with localstack specific ones
2033
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
2034
            ses_verify_identity(sender_email_address)
1✔
2035

2036
        return sender_email_address
1✔
2037

2038
    return inner
1✔
2039

2040

2041
@pytest.fixture
1✔
2042
def ec2_create_security_group(aws_client):
1✔
2043
    ec2_sgs = []
1✔
2044

2045
    def factory(ports=None, ip_protocol: str = "tcp", **kwargs):
1✔
2046
        """
2047
        Create the target group and authorize the security group ingress.
2048
        :param ports: list of ports to be authorized for the ingress rule.
2049
        :param ip_protocol: the ip protocol for the permissions (tcp by default)
2050
        """
2051
        if "GroupName" not in kwargs:
1✔
2052
            # FIXME: This will fail against AWS since the sg prefix is not valid for GroupName
2053
            # > "Group names may not be in the format sg-*".
2054
            kwargs["GroupName"] = f"sg-{short_uid()}"
×
2055
        # Making sure the call to CreateSecurityGroup gets the right arguments
2056
        _args = select_from_typed_dict(CreateSecurityGroupRequest, kwargs)
1✔
2057
        security_group = aws_client.ec2.create_security_group(**_args)
1✔
2058
        security_group_id = security_group["GroupId"]
1✔
2059

2060
        # FIXME: If 'ports' is None or an empty list, authorize_security_group_ingress will fail due to missing IpPermissions.
2061
        # Must ensure ports are explicitly provided or skip authorization entirely if not required.
2062
        permissions = [
1✔
2063
            {
2064
                "FromPort": port,
2065
                "IpProtocol": ip_protocol,
2066
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2067
                "ToPort": port,
2068
            }
2069
            for port in ports or []
2070
        ]
2071
        aws_client.ec2.authorize_security_group_ingress(
1✔
2072
            GroupId=security_group_id,
2073
            IpPermissions=permissions,
2074
        )
2075

2076
        ec2_sgs.append(security_group_id)
1✔
2077
        return security_group
1✔
2078

2079
    yield factory
1✔
2080

2081
    for sg_group_id in ec2_sgs:
1✔
2082
        try:
1✔
2083
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2084
        except Exception as e:
×
2085
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2086

2087

2088
@pytest.fixture
1✔
2089
def ec2_create_vpc_endpoint(aws_client):
1✔
2090
    vpc_endpoints = []
1✔
2091

2092
    def _create(**kwargs: Unpack[CreateVpcEndpointRequest]) -> VpcEndpoint:
1✔
2093
        endpoint = aws_client.ec2.create_vpc_endpoint(**kwargs)
1✔
2094
        endpoint_id = endpoint["VpcEndpoint"]["VpcEndpointId"]
1✔
2095
        vpc_endpoints.append(endpoint_id)
1✔
2096

2097
        def _check_available() -> VpcEndpoint:
1✔
2098
            result = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=[endpoint_id])
1✔
2099
            _endpoint_details = result["VpcEndpoints"][0]
1✔
2100
            assert _endpoint_details["State"] == "available"
1✔
2101

2102
            return _endpoint_details
1✔
2103

2104
        return retry(_check_available, retries=30, sleep=5 if is_aws_cloud() else 1)
1✔
2105

2106
    yield _create
1✔
2107

2108
    try:
1✔
2109
        aws_client.ec2.delete_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
1✔
2110
    except Exception as e:
×
2111
        LOG.error("Error cleaning up VPC endpoint: %s, %s", vpc_endpoints, e)
×
2112

2113
    def wait_for_endpoint_deleted():
1✔
2114
        try:
×
2115
            endpoints = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
×
2116
            assert len(endpoints["VpcEndpoints"]) == 0 or all(
×
2117
                endpoint["State"] == "Deleted" for endpoint in endpoints["VpcEndpoints"]
2118
            )
2119
        except botocore.exceptions.ClientError:
×
2120
            pass
×
2121

2122
    # the vpc can't be deleted if an endpoint exists
2123
    if is_aws_cloud():
1✔
2124
        retry(wait_for_endpoint_deleted, retries=30, sleep=10 if is_aws_cloud() else 1)
×
2125

2126

2127
@pytest.fixture
1✔
2128
def cleanups():
1✔
2129
    cleanup_fns = []
1✔
2130

2131
    yield cleanup_fns
1✔
2132

2133
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2134
        try:
1✔
2135
            cleanup_callback()
1✔
2136
        except Exception as e:
1✔
2137
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2138

2139

2140
@pytest.fixture(scope="session")
1✔
2141
def account_id(aws_client):
1✔
2142
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2143
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2144
    else:
2145
        return TEST_AWS_ACCOUNT_ID
×
2146

2147

2148
@pytest.fixture(scope="session")
1✔
2149
def region_name(aws_client):
1✔
2150
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2151
        return aws_client.sts.meta.region_name
1✔
2152
    else:
2153
        return TEST_AWS_REGION_NAME
×
2154

2155

2156
@pytest.fixture(scope="session")
1✔
2157
def partition(region_name):
1✔
2158
    return get_partition(region_name)
1✔
2159

2160

2161
@pytest.fixture(scope="session")
1✔
2162
def secondary_account_id(secondary_aws_client):
1✔
2163
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2164
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2165
    else:
2166
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2167

2168

2169
@pytest.fixture(scope="session")
1✔
2170
def secondary_region_name():
1✔
2171
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2172

2173

2174
@pytest.hookimpl
1✔
2175
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2176
    only_localstack = pytest.mark.skipif(
1✔
2177
        is_aws_cloud(),
2178
        reason="test only applicable if run against localstack",
2179
    )
2180
    for item in items:
1✔
2181
        for mark in item.iter_markers():
1✔
2182
            if mark.name.endswith("only_localstack"):
1✔
2183
                item.add_marker(only_localstack)
1✔
2184
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2185
            # add a marker that indicates that this test is snapshot validated
2186
            # if it uses the snapshot fixture -> allows selecting only snapshot
2187
            # validated tests in order to capture new snapshots for a whole
2188
            # test file with "-m snapshot_validated"
2189
            item.add_marker("snapshot_validated")
1✔
2190

2191

2192
@pytest.fixture
1✔
2193
def sample_stores() -> AccountRegionBundle:
1✔
2194
    class SampleStore(BaseStore):
1✔
2195
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2196
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2197
        region_specific_attr = LocalAttribute(default=list)
1✔
2198

2199
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2200

2201

2202
@pytest.fixture
1✔
2203
def create_rest_apigw(aws_client_factory):
1✔
2204
    rest_apis = []
1✔
2205
    retry_boto_config = None
1✔
2206
    if is_aws_cloud():
1✔
2207
        retry_boto_config = botocore.config.Config(
×
2208
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2209
            retries={"max_attempts": 10, "mode": "adaptive"}
2210
        )
2211

2212
    def _create_apigateway_function(**kwargs):
1✔
2213
        client_region_name = kwargs.pop("region_name", None)
1✔
2214
        apigateway_client = aws_client_factory(
1✔
2215
            region_name=client_region_name, config=retry_boto_config
2216
        ).apigateway
2217
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2218

2219
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2220
        api_id = response.get("id")
1✔
2221
        rest_apis.append((api_id, client_region_name))
1✔
2222

2223
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2224

2225
    yield _create_apigateway_function
1✔
2226

2227
    for rest_api_id, _client_region_name in rest_apis:
1✔
2228
        apigateway_client = aws_client_factory(
1✔
2229
            region_name=_client_region_name,
2230
            config=retry_boto_config,
2231
        ).apigateway
2232
        # First, retrieve the usage plans associated with the REST API
2233
        usage_plan_ids = []
1✔
2234
        usage_plans = apigateway_client.get_usage_plans()
1✔
2235
        for item in usage_plans.get("items", []):
1✔
2236
            api_stages = item.get("apiStages", [])
1✔
2237
            usage_plan_ids.extend(
1✔
2238
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2239
            )
2240

2241
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2242
        with contextlib.suppress(Exception):
1✔
2243
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2244

2245
        # finally delete the usage plans and the API Keys linked to it
2246
        for usage_plan_id in usage_plan_ids:
1✔
2247
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2248
            for key in usage_plan_keys.get("items", []):
1✔
2249
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2250
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2251

2252

2253
@pytest.fixture
1✔
2254
def create_rest_apigw_openapi(aws_client_factory):
1✔
2255
    rest_apis = []
×
2256

2257
    def _create_apigateway_function(**kwargs):
×
2258
        region_name = kwargs.pop("region_name", None)
×
2259
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2260

2261
        response = apigateway_client.import_rest_api(**kwargs)
×
2262
        api_id = response.get("id")
×
2263
        rest_apis.append((api_id, region_name))
×
2264
        return api_id, response
×
2265

2266
    yield _create_apigateway_function
×
2267

2268
    for rest_api_id, region_name in rest_apis:
×
2269
        with contextlib.suppress(Exception):
×
2270
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2271
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2272

2273

2274
@pytest.fixture
1✔
2275
def assert_host_customisation(monkeypatch):
1✔
2276
    localstack_host = "foo.bar"
1✔
2277
    monkeypatch.setattr(
1✔
2278
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2279
    )
2280

2281
    def asserter(
1✔
2282
        url: str,
2283
        *,
2284
        custom_host: str | None = None,
2285
    ):
2286
        if custom_host is not None:
1✔
2287
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2288

2289
            assert localstack_host not in url
×
2290
        else:
2291
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2292

2293
    yield asserter
1✔
2294

2295

2296
@pytest.fixture
1✔
2297
def echo_http_server(httpserver: HTTPServer):
1✔
2298
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2299

2300
    def _echo(request: Request) -> Response:
1✔
2301
        request_json = None
1✔
2302
        if request.is_json:
1✔
2303
            with contextlib.suppress(ValueError):
1✔
2304
                request_json = json.loads(request.data)
1✔
2305
        result = {
1✔
2306
            "data": request.data or "{}",
2307
            "headers": dict(request.headers),
2308
            "url": request.url,
2309
            "method": request.method,
2310
            "json": request_json,
2311
        }
2312
        response_body = json.dumps(json_safe(result))
1✔
2313
        return Response(response_body, status=200)
1✔
2314

2315
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2316
    http_endpoint = httpserver.url_for("/")
1✔
2317

2318
    return http_endpoint
1✔
2319

2320

2321
@pytest.fixture
1✔
2322
def echo_http_server_post(echo_http_server):
1✔
2323
    """
2324
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2325
    """
2326
    if is_aws_cloud():
1✔
2327
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2328

2329
    return f"{echo_http_server}post"
1✔
2330

2331

2332
def create_policy_doc(effect: str, actions: list, resource=None) -> dict:
1✔
2333
    actions = ensure_list(actions)
1✔
2334
    resource = resource or "*"
1✔
2335
    return {
1✔
2336
        "Version": "2012-10-17",
2337
        "Statement": [
2338
            {
2339
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2340
                "Sid": f"s{short_uid()}",
2341
                "Effect": effect,
2342
                "Action": actions,
2343
                "Resource": resource,
2344
            }
2345
        ],
2346
    }
2347

2348

2349
@pytest.fixture
1✔
2350
def create_policy_generated_document(create_policy):
1✔
2351
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2352
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2353
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2354
        response = create_policy(
1✔
2355
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2356
        )
2357
        policy_arn = response["Policy"]["Arn"]
1✔
2358
        return policy_arn
1✔
2359

2360
    return _create_policy_with_doc
1✔
2361

2362

2363
@pytest.fixture
1✔
2364
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2365
    def _create_role_with_policy(
1✔
2366
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2367
    ):
2368
        iam_client = iam_client or aws_client.iam
1✔
2369

2370
        role_name = f"role-{short_uid()}"
1✔
2371
        result = create_role(
1✔
2372
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2373
        )
2374
        role_arn = result["Role"]["Arn"]
1✔
2375
        policy_name = f"p-{short_uid()}"
1✔
2376

2377
        if attach:
1✔
2378
            # create role and attach role policy
2379
            policy_arn = create_policy_generated_document(
1✔
2380
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2381
            )
2382
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2383
        else:
2384
            # put role policy
2385
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2386
            policy_document = json.dumps(policy_document)
1✔
2387
            iam_client.put_role_policy(
1✔
2388
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2389
            )
2390

2391
        return role_name, role_arn
1✔
2392

2393
    return _create_role_with_policy
1✔
2394

2395

2396
@pytest.fixture
1✔
2397
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2398
    def _create_user_with_policy(effect, actions, resource=None):
×
2399
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2400
        username = f"user-{short_uid()}"
×
2401
        create_user(UserName=username)
×
2402
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2403
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2404
        return username, keys
×
2405

2406
    return _create_user_with_policy
×
2407

2408

2409
@pytest.fixture()
1✔
2410
def register_extension(s3_bucket, aws_client):
1✔
2411
    cfn_client = aws_client.cloudformation
×
2412
    extensions_arns = []
×
2413

2414
    def _register(extension_name, extension_type, artifact_path):
×
2415
        bucket = s3_bucket
×
2416
        key = f"artifact-{short_uid()}"
×
2417

2418
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2419

2420
        register_response = cfn_client.register_type(
×
2421
            Type=extension_type,
2422
            TypeName=extension_name,
2423
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2424
        )
2425

2426
        registration_token = register_response["RegistrationToken"]
×
2427
        cfn_client.get_waiter("type_registration_complete").wait(
×
2428
            RegistrationToken=registration_token
2429
        )
2430

2431
        describe_response = cfn_client.describe_type_registration(
×
2432
            RegistrationToken=registration_token
2433
        )
2434

2435
        extensions_arns.append(describe_response["TypeArn"])
×
2436
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2437

2438
        return describe_response
×
2439

2440
    yield _register
×
2441

2442
    for arn in extensions_arns:
×
2443
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2444
        for v in versions:
×
2445
            try:
×
2446
                cfn_client.deregister_type(Arn=v["Arn"])
×
2447
            except Exception:
×
2448
                continue
×
2449
        cfn_client.deregister_type(Arn=arn)
×
2450

2451

2452
@pytest.fixture
1✔
2453
def hosted_zone(aws_client):
1✔
2454
    zone_ids = []
1✔
2455

2456
    def factory(**kwargs):
1✔
2457
        if "CallerReference" not in kwargs:
1✔
2458
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2459
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2460
        zone_id = response["HostedZone"]["Id"]
1✔
2461
        zone_ids.append(zone_id)
1✔
2462
        return response
1✔
2463

2464
    yield factory
1✔
2465

2466
    for zone_id in zone_ids[::-1]:
1✔
2467
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2468

2469

2470
@pytest.fixture
1✔
2471
def openapi_validate(monkeypatch):
1✔
2472
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2473
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2474

2475

2476
@pytest.fixture
1✔
2477
def set_resource_custom_id():
1✔
2478
    set_ids = []
1✔
2479

2480
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2481
        localstack_id_manager.set_custom_id(
1✔
2482
            resource_identifier=resource_identifier, custom_id=custom_id
2483
        )
2484
        set_ids.append(resource_identifier)
1✔
2485

2486
    yield _set_custom_id
1✔
2487

2488
    for resource_identifier in set_ids:
1✔
2489
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2490

2491

2492
###############################
2493
# Events (EventBridge) fixtures
2494
###############################
2495

2496

2497
@pytest.fixture
1✔
2498
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2499
    event_bus_names = []
1✔
2500

2501
    def _create_event_bus(**kwargs):
1✔
2502
        if "Name" not in kwargs:
1✔
2503
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2504

2505
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2506
        event_bus_names.append(kwargs["Name"])
1✔
2507
        return response
1✔
2508

2509
    yield _create_event_bus
1✔
2510

2511
    for event_bus_name in event_bus_names:
1✔
2512
        try:
1✔
2513
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2514
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2515

2516
            # Delete all rules for the current event bus
2517
            for rule in rules:
1✔
2518
                try:
1✔
2519
                    response = aws_client.events.list_targets_by_rule(
1✔
2520
                        Rule=rule, EventBusName=event_bus_name
2521
                    )
2522
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2523

2524
                    # Remove all targets for the current rule
2525
                    if targets:
1✔
2526
                        for target in targets:
1✔
2527
                            aws_client.events.remove_targets(
1✔
2528
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2529
                            )
2530

2531
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2532
                except Exception as e:
×
2533
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2534

2535
            # Delete archives for event bus
2536
            event_source_arn = (
1✔
2537
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2538
            )
2539
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2540
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2541
            for archive in archives:
1✔
2542
                try:
1✔
2543
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2544
                except Exception as e:
×
2545
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2546

2547
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2548
        except Exception as e:
1✔
2549
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2550

2551

2552
@pytest.fixture
1✔
2553
def events_put_rule(aws_client):
1✔
2554
    rules = []
1✔
2555

2556
    def _put_rule(**kwargs):
1✔
2557
        if "Name" not in kwargs:
1✔
2558
            kwargs["Name"] = f"rule-{short_uid()}"
×
2559

2560
        response = aws_client.events.put_rule(**kwargs)
1✔
2561
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2562
        return response
1✔
2563

2564
    yield _put_rule
1✔
2565

2566
    for rule, event_bus_name in rules:
1✔
2567
        try:
1✔
2568
            response = aws_client.events.list_targets_by_rule(
1✔
2569
                Rule=rule, EventBusName=event_bus_name
2570
            )
2571
            targets = [target["Id"] for target in response["Targets"]]
1✔
2572

2573
            # Remove all targets for the current rule
2574
            if targets:
1✔
2575
                for target in targets:
1✔
2576
                    aws_client.events.remove_targets(
1✔
2577
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2578
                    )
2579

2580
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2581
        except Exception as e:
1✔
2582
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2583

2584

2585
@pytest.fixture
1✔
2586
def events_create_rule(aws_client):
1✔
2587
    rules = []
1✔
2588

2589
    def _create_rule(**kwargs):
1✔
2590
        rule_name = kwargs["Name"]
1✔
2591
        bus_name = kwargs.get("EventBusName", "")
1✔
2592
        pattern = kwargs.get("EventPattern", {})
1✔
2593
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2594
        rule_arn = aws_client.events.put_rule(
1✔
2595
            Name=rule_name,
2596
            EventBusName=bus_name,
2597
            EventPattern=json.dumps(pattern),
2598
            ScheduleExpression=schedule,
2599
        )["RuleArn"]
2600
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2601
        return rule_arn
1✔
2602

2603
    yield _create_rule
1✔
2604

2605
    for rule in rules:
1✔
2606
        targets = aws_client.events.list_targets_by_rule(
1✔
2607
            Rule=rule["name"], EventBusName=rule["bus"]
2608
        )["Targets"]
2609

2610
        targetIds = [target["Id"] for target in targets]
1✔
2611
        if len(targetIds) > 0:
1✔
2612
            aws_client.events.remove_targets(
1✔
2613
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2614
            )
2615

2616
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2617

2618

2619
@pytest.fixture
1✔
2620
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2621
    """
2622
    Fixture that creates an SQS queue and sets it up as a target for EventBridge events.
2623
    """
2624
    queue_urls = []
1✔
2625

2626
    def _sqs_as_events_target(
1✔
2627
        queue_name: str | None = None, custom_aws_client=None
2628
    ) -> tuple[str, str]:
2629
        if not queue_name:
1✔
2630
            queue_name = f"tests-queue-{short_uid()}"
1✔
2631
        if custom_aws_client:
1✔
2632
            sqs_client = custom_aws_client.sqs
×
2633
        else:
2634
            sqs_client = aws_client.sqs
1✔
2635
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2636
        queue_urls.append((queue_url, sqs_client))
1✔
2637
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2638
        policy = {
1✔
2639
            "Version": "2012-10-17",
2640
            "Id": f"sqs-eventbridge-{short_uid()}",
2641
            "Statement": [
2642
                {
2643
                    "Sid": f"SendMessage-{short_uid()}",
2644
                    "Effect": "Allow",
2645
                    "Principal": {"Service": "events.amazonaws.com"},
2646
                    "Action": "sqs:SendMessage",
2647
                    "Resource": queue_arn,
2648
                }
2649
            ],
2650
        }
2651
        sqs_client.set_queue_attributes(
1✔
2652
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2653
        )
2654
        return queue_url, queue_arn, queue_name
1✔
2655

2656
    yield _sqs_as_events_target
1✔
2657

2658
    for queue_url, sqs_client in queue_urls:
1✔
2659
        try:
1✔
2660
            sqs_client.delete_queue(QueueUrl=queue_url)
1✔
2661
        except Exception as e:
×
2662
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2663

2664

2665
@pytest.fixture
1✔
2666
def create_role_event_bus_source_to_bus_target(create_iam_role_with_policy):
1✔
2667
    def _create_role_event_bus_to_bus():
1✔
2668
        assume_role_policy_document_bus_source_to_bus_target = {
1✔
2669
            "Version": "2012-10-17",
2670
            "Statement": [
2671
                {
2672
                    "Effect": "Allow",
2673
                    "Principal": {"Service": "events.amazonaws.com"},
2674
                    "Action": "sts:AssumeRole",
2675
                }
2676
            ],
2677
        }
2678

2679
        policy_document_bus_source_to_bus_target = {
1✔
2680
            "Version": "2012-10-17",
2681
            "Statement": [
2682
                {
2683
                    "Sid": "",
2684
                    "Effect": "Allow",
2685
                    "Action": "events:PutEvents",
2686
                    "Resource": "arn:aws:events:*:*:event-bus/*",
2687
                }
2688
            ],
2689
        }
2690

2691
        role_arn_bus_source_to_bus_target = create_iam_role_with_policy(
1✔
2692
            RoleDefinition=assume_role_policy_document_bus_source_to_bus_target,
2693
            PolicyDefinition=policy_document_bus_source_to_bus_target,
2694
        )
2695

2696
        return role_arn_bus_source_to_bus_target
1✔
2697

2698
    yield _create_role_event_bus_to_bus
1✔
2699

2700

2701
@pytest.fixture
1✔
2702
def get_primary_secondary_client(
1✔
2703
    aws_client_factory,
2704
    secondary_aws_client_factory,
2705
    region_name,
2706
    secondary_region_name,
2707
    account_id,
2708
    secondary_account_id,
2709
):
2710
    def _get_primary_secondary_clients(cross_scenario: str):
1✔
2711
        """
2712
        Returns primary and secondary AWS clients based on the cross-scenario.
2713
        :param cross_scenario: The scenario for cross-region or cross-account testing.
2714
                               Options: "region", "account", "region_account"
2715
                               account_region cross scenario is not supported by AWS
2716
        :return: A dictionary containing primary and secondary AWS clients, and their respective region and account IDs.
2717
        """
2718
        secondary_region = secondary_region_name
1✔
2719
        secondary_account = secondary_account_id
1✔
2720
        if cross_scenario not in ["region", "account", "region_account"]:
1✔
2721
            raise ValueError(f"cross_scenario {cross_scenario} not supported")
×
2722

2723
        primary_client = aws_client_factory(region_name=region_name)
1✔
2724

2725
        if cross_scenario == "region":
1✔
2726
            secondary_account = account_id
1✔
2727
            secondary_client = aws_client_factory(region_name=secondary_region_name)
1✔
2728

2729
        elif cross_scenario == "account":
1✔
2730
            secondary_region = region_name
1✔
2731
            secondary_client = secondary_aws_client_factory(region_name=region_name)
1✔
2732

2733
        elif cross_scenario == "region_account":
1✔
2734
            secondary_client = secondary_aws_client_factory(region_name=secondary_region)
1✔
2735

2736
        return {
1✔
2737
            "primary_aws_client": primary_client,
2738
            "secondary_aws_client": secondary_client,
2739
            "secondary_region_name": secondary_region,
2740
            "secondary_account_id": secondary_account,
2741
        }
2742

2743
    return _get_primary_secondary_clients
1✔
2744

2745

2746
@pytest.fixture
1✔
2747
def clean_up(
1✔
2748
    aws_client,
2749
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2750
    def _clean_up(
1✔
2751
        bus_name=None,
2752
        rule_name=None,
2753
        target_ids=None,
2754
        queue_url=None,
2755
        log_group_name=None,
2756
    ):
2757
        events_client = aws_client.events
1✔
2758
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2759
        if target_ids:
1✔
2760
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2761
            call_safe(
1✔
2762
                events_client.remove_targets,
2763
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2764
            )
2765
        if rule_name:
1✔
2766
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2767
        if bus_name:
1✔
2768
            call_safe(events_client.delete_event_bus, kwargs={"Name": bus_name})
×
2769
        if queue_url:
1✔
2770
            sqs_client = aws_client.sqs
×
2771
            call_safe(sqs_client.delete_queue, kwargs={"QueueUrl": queue_url})
×
2772
        if log_group_name:
1✔
2773
            logs_client = aws_client.logs
×
2774

2775
            def _delete_log_group():
×
2776
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2777
                for log_stream in log_streams["logStreams"]:
×
2778
                    logs_client.delete_log_stream(
×
2779
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2780
                    )
2781
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2782

2783
            call_safe(_delete_log_group)
×
2784

2785
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc