• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17144436094

21 Aug 2025 11:28PM UTC coverage: 86.843% (-0.03%) from 86.876%
17144436094

push

github

web-flow
APIGW: internalize DeleteIntegrationResponse (#13046)

40 of 45 new or added lines in 1 file covered. (88.89%)

235 existing lines in 11 files now uncovered.

67068 of 77229 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.78
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from typing import TYPE_CHECKING, Any, Callable, Optional, Unpack
1✔
10

11
import botocore.auth
1✔
12
import botocore.config
1✔
13
import botocore.credentials
1✔
14
import botocore.session
1✔
15
import pytest
1✔
16
from _pytest.config import Config
1✔
17
from _pytest.nodes import Item
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.regions import EndpointResolver
1✔
20
from pytest_httpserver import HTTPServer
1✔
21
from werkzeug import Request, Response
1✔
22

23
from localstack import config
1✔
24
from localstack.aws.api.cloudformation import CreateChangeSetInput, Parameter
1✔
25
from localstack.aws.api.ec2 import CreateSecurityGroupRequest, CreateVpcEndpointRequest, VpcEndpoint
1✔
26
from localstack.aws.connect import ServiceLevelClientFactory
1✔
27
from localstack.services.stores import (
1✔
28
    AccountRegionBundle,
29
    BaseStore,
30
    CrossAccountAttribute,
31
    CrossRegionAttribute,
32
    LocalAttribute,
33
)
34
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
35
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
36
from localstack.testing.config import (
1✔
37
    SECONDARY_TEST_AWS_ACCOUNT_ID,
38
    SECONDARY_TEST_AWS_REGION_NAME,
39
    TEST_AWS_ACCOUNT_ID,
40
    TEST_AWS_REGION_NAME,
41
)
42
from localstack.utils import testutil
1✔
43
from localstack.utils.aws.arns import get_partition
1✔
44
from localstack.utils.aws.client import SigningHttpClient
1✔
45
from localstack.utils.aws.resources import create_dynamodb_table
1✔
46
from localstack.utils.bootstrap import is_api_enabled
1✔
47
from localstack.utils.collections import ensure_list, select_from_typed_dict
1✔
48
from localstack.utils.functions import call_safe, run_safe
1✔
49
from localstack.utils.http import safe_requests as requests
1✔
50
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
51
from localstack.utils.json import CustomEncoder, json_safe
1✔
52
from localstack.utils.net import wait_for_port_open
1✔
53
from localstack.utils.strings import short_uid, to_str
1✔
54
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
55

56
LOG = logging.getLogger(__name__)
1✔
57

58
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
59
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
60

61
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
62
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
63
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
64
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
65

66

67
if TYPE_CHECKING:
1✔
68
    from mypy_boto3_sqs import SQSClient
×
69
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
70

71

72
@pytest.fixture(scope="session")
1✔
73
def aws_client_no_retry(aws_client_factory):
1✔
74
    """
75
    This fixture can be used to obtain Boto clients with disabled retries for testing.
76
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
77

78
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
79
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
80

81
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
82
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
83
    General socket/connection errors:
84
    * ConnectionError
85
    * ConnectionClosedError
86
    * ReadTimeoutError
87
    * EndpointConnectionError
88

89
    Service-side throttling/limit errors and exceptions:
90
    * Throttling
91
    * ThrottlingException
92
    * ThrottledException
93
    * RequestThrottledException
94
    * ProvisionedThroughputExceededException
95

96
    HTTP status codes: 429, 500, 502, 503, 504, and 509
97

98
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
99
    """
100
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
101
    return aws_client_factory(config=no_retry_config)
1✔
102

103

104
@pytest.fixture(scope="class")
1✔
105
def aws_http_client_factory(aws_session):
1✔
106
    """
107
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
108
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
109
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
110
    LocalStack.
111

112
    Example invocations
113

114
        client = aws_signing_http_client_factory("sqs")
115
        client.get("http://localhost:4566/000000000000/my-queue")
116

117
    or
118
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
119
        client.post("...")
120
    """
121

122
    def factory(
1✔
123
        service: str,
124
        region: str = None,
125
        signer_factory: Callable[
126
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
127
        ] = botocore.auth.SigV4QueryAuth,
128
        endpoint_url: str = None,
129
        aws_access_key_id: str = None,
130
        aws_secret_access_key: str = None,
131
    ):
132
        region = region or TEST_AWS_REGION_NAME
1✔
133

134
        if aws_access_key_id or aws_secret_access_key:
1✔
135
            credentials = botocore.credentials.Credentials(
1✔
136
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
137
            )
138
        else:
139
            credentials = aws_session.get_credentials()
1✔
140

141
        creds = credentials.get_frozen_credentials()
1✔
142

143
        if not endpoint_url:
1✔
144
            if is_aws_cloud():
1✔
145
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
146
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
147
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
148
            else:
149
                endpoint_url = config.internal_service_url()
1✔
150

151
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
152

153
    return factory
1✔
154

155

156
@pytest.fixture(scope="class")
1✔
157
def s3_vhost_client(aws_client_factory, region_name):
1✔
158
    return aws_client_factory(
1✔
159
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
160
    ).s3
161

162

163
@pytest.fixture
1✔
164
def dynamodb_wait_for_table_active(aws_client):
1✔
165
    def wait_for_table_active(table_name: str, client=None):
1✔
166
        def wait():
1✔
167
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
168
                "TableStatus"
169
            ] == "ACTIVE"
170

171
        poll_condition(wait, timeout=30)
1✔
172

173
    return wait_for_table_active
1✔
174

175

176
@pytest.fixture
1✔
177
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
178
    tables = []
1✔
179

180
    def factory(**kwargs):
1✔
181
        if "TableName" not in kwargs:
1✔
182
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
183

184
        tables.append(kwargs["TableName"])
1✔
185
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
186
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
187
        return response
1✔
188

189
    yield factory
1✔
190

191
    # cleanup
192
    for table in tables:
1✔
193
        try:
1✔
194
            # table has to be in ACTIVE state before deletion
195
            dynamodb_wait_for_table_active(table)
1✔
196
            aws_client.dynamodb.delete_table(TableName=table)
1✔
197
        except Exception as e:
1✔
198
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
199

200

201
@pytest.fixture
1✔
202
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
203
    # beware, this swallows exception in create_dynamodb_table utility function
204
    tables = []
1✔
205

206
    def factory(**kwargs):
1✔
207
        kwargs["client"] = aws_client.dynamodb
1✔
208
        if "table_name" not in kwargs:
1✔
209
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
210
        if "partition_key" not in kwargs:
1✔
211
            kwargs["partition_key"] = "id"
1✔
212

213
        tables.append(kwargs["table_name"])
1✔
214

215
        return create_dynamodb_table(**kwargs)
1✔
216

217
    yield factory
1✔
218

219
    # cleanup
220
    for table in tables:
1✔
221
        try:
1✔
222
            # table has to be in ACTIVE state before deletion
223
            dynamodb_wait_for_table_active(table)
1✔
224
            aws_client.dynamodb.delete_table(TableName=table)
1✔
225
        except Exception as e:
1✔
226
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
227

228

229
@pytest.fixture
1✔
230
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
231
    buckets = []
1✔
232

233
    def factory(**kwargs) -> str:
1✔
234
        if "Bucket" not in kwargs:
1✔
235
            kwargs["Bucket"] = "test-bucket-%s" % short_uid()
1✔
236

237
        if (
1✔
238
            "CreateBucketConfiguration" not in kwargs
239
            and aws_client.s3.meta.region_name != "us-east-1"
240
        ):
241
            kwargs["CreateBucketConfiguration"] = {
×
242
                "LocationConstraint": aws_client.s3.meta.region_name
243
            }
244

245
        aws_client.s3.create_bucket(**kwargs)
1✔
246
        buckets.append(kwargs["Bucket"])
1✔
247
        return kwargs["Bucket"]
1✔
248

249
    yield factory
1✔
250

251
    # cleanup
252
    for bucket in buckets:
1✔
253
        try:
1✔
254
            s3_empty_bucket(bucket)
1✔
255
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
256
        except Exception as e:
1✔
257
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
258

259

260
@pytest.fixture
1✔
261
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
262
    buckets = []
1✔
263

264
    def factory(s3_client, **kwargs) -> str:
1✔
265
        if "Bucket" not in kwargs:
1✔
266
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
267

268
        response = s3_client.create_bucket(**kwargs)
1✔
269
        buckets.append(kwargs["Bucket"])
1✔
270
        return response
1✔
271

272
    yield factory
1✔
273

274
    # cleanup
275
    for bucket in buckets:
1✔
276
        try:
1✔
277
            s3_empty_bucket(bucket)
1✔
278
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
279
        except Exception as e:
×
280
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
281

282

283
@pytest.fixture
1✔
284
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
285
    region = aws_client.s3.meta.region_name
1✔
286
    kwargs = {}
1✔
287
    if region != "us-east-1":
1✔
288
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
289
    return s3_create_bucket(**kwargs)
1✔
290

291

292
@pytest.fixture
1✔
293
def s3_empty_bucket(aws_client):
1✔
294
    """
295
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
296
    """
297

298
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
299
    # FIXME: this won't work when bucket has more than 1000 objects
300
    def factory(bucket_name: str):
1✔
301
        kwargs = {}
1✔
302
        try:
1✔
303
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
304
            kwargs["BypassGovernanceRetention"] = True
1✔
305
        except ClientError:
1✔
306
            pass
1✔
307

308
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
309
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
310
        if objects:
1✔
311
            aws_client.s3.delete_objects(
1✔
312
                Bucket=bucket_name,
313
                Delete={"Objects": objects},
314
                **kwargs,
315
            )
316

317
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
318
        versions = response.get("Versions", [])
1✔
319
        versions.extend(response.get("DeleteMarkers", []))
1✔
320

321
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
322
        if object_versions:
1✔
323
            aws_client.s3.delete_objects(
1✔
324
                Bucket=bucket_name,
325
                Delete={"Objects": object_versions},
326
                **kwargs,
327
            )
328

329
    yield factory
1✔
330

331

332
@pytest.fixture
1✔
333
def sqs_create_queue(aws_client):
1✔
334
    queue_urls = []
1✔
335

336
    def factory(**kwargs):
1✔
337
        if "QueueName" not in kwargs:
1✔
338
            kwargs["QueueName"] = "test-queue-%s" % short_uid()
1✔
339

340
        response = aws_client.sqs.create_queue(**kwargs)
1✔
341
        url = response["QueueUrl"]
1✔
342
        queue_urls.append(url)
1✔
343

344
        return url
1✔
345

346
    yield factory
1✔
347

348
    # cleanup
349
    for queue_url in queue_urls:
1✔
350
        try:
1✔
351
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
352
        except Exception as e:
1✔
353
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
354

355

356
@pytest.fixture
1✔
357
def sqs_receive_messages_delete(aws_client):
1✔
358
    def factory(
1✔
359
        queue_url: str,
360
        expected_messages: Optional[int] = None,
361
        wait_time: Optional[int] = 5,
362
    ):
363
        response = aws_client.sqs.receive_message(
1✔
364
            QueueUrl=queue_url,
365
            MessageAttributeNames=["All"],
366
            VisibilityTimeout=0,
367
            WaitTimeSeconds=wait_time,
368
        )
369
        messages = []
1✔
370
        for m in response["Messages"]:
1✔
371
            message = json.loads(to_str(m["Body"]))
1✔
372
            messages.append(message)
1✔
373

374
        if expected_messages is not None:
1✔
375
            assert len(messages) == expected_messages
×
376

377
        for message in response["Messages"]:
1✔
378
            aws_client.sqs.delete_message(
1✔
379
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
380
            )
381

382
        return messages
1✔
383

384
    return factory
1✔
385

386

387
@pytest.fixture
1✔
388
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
389
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
390
        all_messages = []
1✔
391
        for _ in range(max_iterations):
1✔
392
            try:
1✔
393
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
394
            except KeyError:
×
395
                # there were no messages
396
                continue
×
397
            all_messages.extend(messages)
1✔
398

399
            if len(all_messages) >= expected_messages:
1✔
400
                return all_messages[:expected_messages]
1✔
401

402
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
403

404
    return factory
1✔
405

406

407
@pytest.fixture
1✔
408
def sqs_collect_messages(aws_client):
1✔
409
    """Collects SQS messages from a given queue_url and deletes them by default.
410
    Example usage:
411
    messages = sqs_collect_messages(
412
         my_queue_url,
413
         expected=2,
414
         timeout=10,
415
         attribute_names=["All"],
416
         message_attribute_names=["All"],
417
    )
418
    """
419

420
    def factory(
1✔
421
        queue_url: str,
422
        expected: int,
423
        timeout: int,
424
        delete: bool = True,
425
        attribute_names: list[str] = None,
426
        message_attribute_names: list[str] = None,
427
        max_number_of_messages: int = 1,
428
        wait_time_seconds: int = 5,
429
        sqs_client: "SQSClient | None" = None,
430
    ) -> list["MessageTypeDef"]:
431
        sqs_client = sqs_client or aws_client.sqs
1✔
432
        collected = []
1✔
433

434
        def _receive():
1✔
435
            response = sqs_client.receive_message(
1✔
436
                QueueUrl=queue_url,
437
                # Maximum is 20 seconds. Performs long polling.
438
                WaitTimeSeconds=wait_time_seconds,
439
                # Maximum 10 messages
440
                MaxNumberOfMessages=max_number_of_messages,
441
                AttributeNames=attribute_names or [],
442
                MessageAttributeNames=message_attribute_names or [],
443
            )
444

445
            if messages := response.get("Messages"):
1✔
446
                collected.extend(messages)
1✔
447

448
                if delete:
1✔
449
                    for m in messages:
1✔
450
                        sqs_client.delete_message(
1✔
451
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
452
                        )
453

454
            return len(collected) >= expected
1✔
455

456
        if not poll_condition(_receive, timeout=timeout):
1✔
457
            raise TimeoutError(
×
458
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
459
            )
460

461
        return collected
1✔
462

463
    yield factory
1✔
464

465

466
@pytest.fixture
1✔
467
def sqs_queue(sqs_create_queue):
1✔
468
    return sqs_create_queue()
1✔
469

470

471
@pytest.fixture
1✔
472
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
473
    def _get_queue_arn(queue_url: str) -> str:
1✔
474
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
475
            "Attributes"
476
        ]["QueueArn"]
477

478
    return _get_queue_arn
1✔
479

480

481
@pytest.fixture
1✔
482
def sqs_queue_exists(aws_client):
1✔
483
    def _queue_exists(queue_url: str) -> bool:
1✔
484
        """
485
        Checks whether a queue with the given queue URL exists.
486
        :param queue_url: the queue URL
487
        :return: true if the queue exists, false otherwise
488
        """
489
        try:
1✔
490
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
491
            return result.get("QueueUrl") == queue_url
×
492
        except ClientError as e:
1✔
493
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
494
                return False
1✔
495
            raise
×
496

497
    yield _queue_exists
1✔
498

499

500
@pytest.fixture
1✔
501
def sns_create_topic(aws_client):
1✔
502
    topic_arns = []
1✔
503

504
    def _create_topic(**kwargs):
1✔
505
        if "Name" not in kwargs:
1✔
506
            kwargs["Name"] = "test-topic-%s" % short_uid()
1✔
507
        response = aws_client.sns.create_topic(**kwargs)
1✔
508
        topic_arns.append(response["TopicArn"])
1✔
509
        return response
1✔
510

511
    yield _create_topic
1✔
512

513
    for topic_arn in topic_arns:
1✔
514
        try:
1✔
515
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
516
        except Exception as e:
×
517
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
518

519

520
@pytest.fixture
1✔
521
def sns_wait_for_topic_delete(aws_client):
1✔
522
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
523
        def wait():
1✔
524
            try:
1✔
525
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
526
                return False
×
527
            except Exception as e:
1✔
528
                if "NotFound" in e.response["Error"]["Code"]:
1✔
529
                    return True
1✔
530

531
                raise
×
532

533
        poll_condition(wait, timeout=30)
1✔
534

535
    return wait_for_topic_delete
1✔
536

537

538
@pytest.fixture
1✔
539
def sns_subscription(aws_client):
1✔
540
    sub_arns = []
1✔
541

542
    def _create_sub(**kwargs):
1✔
543
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
544
            kwargs["ReturnSubscriptionArn"] = True
1✔
545

546
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
547
        response = aws_client.sns.subscribe(**kwargs)
1✔
548
        sub_arn = response["SubscriptionArn"]
1✔
549
        sub_arns.append(sub_arn)
1✔
550
        return response
1✔
551

552
    yield _create_sub
1✔
553

554
    for sub_arn in sub_arns:
1✔
555
        try:
1✔
556
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
557
        except Exception as e:
1✔
558
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
559

560

561
@pytest.fixture
1✔
562
def sns_topic(sns_create_topic, aws_client):
1✔
563
    topic_arn = sns_create_topic()["TopicArn"]
1✔
564
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
565

566

567
@pytest.fixture
1✔
568
def sns_allow_topic_sqs_queue(aws_client):
1✔
569
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
570
        # allow topic to write to sqs queue
571
        aws_client.sqs.set_queue_attributes(
1✔
572
            QueueUrl=sqs_queue_url,
573
            Attributes={
574
                "Policy": json.dumps(
575
                    {
576
                        "Statement": [
577
                            {
578
                                "Effect": "Allow",
579
                                "Principal": {"Service": "sns.amazonaws.com"},
580
                                "Action": "sqs:SendMessage",
581
                                "Resource": sqs_queue_arn,
582
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
583
                            }
584
                        ]
585
                    }
586
                )
587
            },
588
        )
589

590
    return _allow_sns_topic
1✔
591

592

593
@pytest.fixture
1✔
594
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
595
    subscriptions = []
1✔
596

597
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> dict[str, str]:
1✔
598
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
599

600
        # connect sns topic to sqs
601
        subscription = aws_client.sns.subscribe(
1✔
602
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
603
        )
604
        subscription_arn = subscription["SubscriptionArn"]
1✔
605

606
        # allow topic to write to sqs queue
607
        sns_allow_topic_sqs_queue(
1✔
608
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
609
        )
610

611
        subscriptions.append(subscription_arn)
1✔
612
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
613
            "Attributes"
614
        ]
615

616
    yield _factory
1✔
617

618
    for arn in subscriptions:
1✔
619
        try:
1✔
620
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
621
        except Exception as e:
×
622
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
623

624

625
@pytest.fixture
1✔
626
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
627
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
628
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
629
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
630
    http_servers = []
1✔
631

632
    def _create_http_endpoint(
1✔
633
        raw_message_delivery: bool = False,
634
    ) -> tuple[str, str, str, HTTPServer]:
635
        server = HTTPServer()
1✔
636
        server.start()
1✔
637
        http_servers.append(server)
1✔
638
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
639
        endpoint_url = server.url_for("/sns-endpoint")
1✔
640
        wait_for_port_open(endpoint_url)
1✔
641

642
        topic_arn = sns_create_topic()["TopicArn"]
1✔
643
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
644
        subscription_arn = subscription["SubscriptionArn"]
1✔
645
        delivery_policy = {
1✔
646
            "healthyRetryPolicy": {
647
                "minDelayTarget": 1,
648
                "maxDelayTarget": 1,
649
                "numRetries": 0,
650
                "numNoDelayRetries": 0,
651
                "numMinDelayRetries": 0,
652
                "numMaxDelayRetries": 0,
653
                "backoffFunction": "linear",
654
            },
655
            "sicklyRetryPolicy": None,
656
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
657
            "guaranteed": False,
658
        }
659
        aws_client.sns.set_subscription_attributes(
1✔
660
            SubscriptionArn=subscription_arn,
661
            AttributeName="DeliveryPolicy",
662
            AttributeValue=json.dumps(delivery_policy),
663
        )
664

665
        if raw_message_delivery:
1✔
666
            aws_client.sns.set_subscription_attributes(
1✔
667
                SubscriptionArn=subscription_arn,
668
                AttributeName="RawMessageDelivery",
669
                AttributeValue="true",
670
            )
671

672
        return topic_arn, subscription_arn, endpoint_url, server
1✔
673

674
    yield _create_http_endpoint
1✔
675

676
    for http_server in http_servers:
1✔
677
        if http_server.is_running():
1✔
678
            http_server.stop()
1✔
679

680

681
@pytest.fixture
1✔
682
def route53_hosted_zone(aws_client):
1✔
683
    hosted_zones = []
1✔
684

685
    def factory(**kwargs):
1✔
686
        if "Name" not in kwargs:
1✔
687
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
688
        if "CallerReference" not in kwargs:
1✔
689
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
690
        response = aws_client.route53.create_hosted_zone(
1✔
691
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
692
        )
693
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
694
        return response
1✔
695

696
    yield factory
1✔
697

698
    for zone in hosted_zones:
1✔
699
        try:
1✔
700
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
701
        except Exception as e:
1✔
702
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
703

704

705
@pytest.fixture
1✔
706
def transcribe_create_job(s3_bucket, aws_client):
1✔
707
    job_names = []
1✔
708

709
    def _create_job(audio_file: str, params: Optional[dict[str, Any]] = None) -> str:
1✔
710
        s3_key = "test-clip.wav"
1✔
711

712
        if not params:
1✔
713
            params = {}
1✔
714

715
        if "TranscriptionJobName" not in params:
1✔
716
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
717

718
        if "LanguageCode" not in params:
1✔
719
            params["LanguageCode"] = "en-GB"
1✔
720

721
        if "Media" not in params:
1✔
722
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
723

724
        # upload test wav to a s3 bucket
725
        with open(audio_file, "rb") as f:
1✔
726
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
727

728
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
729

730
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
731
        job_names.append(job_name)
1✔
732

733
        return job_name
1✔
734

735
    yield _create_job
1✔
736

737
    for job_name in job_names:
1✔
738
        with contextlib.suppress(ClientError):
1✔
739
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
740

741

742
@pytest.fixture
1✔
743
def kinesis_create_stream(aws_client):
1✔
744
    stream_names = []
1✔
745

746
    def _create_stream(**kwargs):
1✔
747
        if "StreamName" not in kwargs:
1✔
748
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
749
        aws_client.kinesis.create_stream(**kwargs)
1✔
750
        stream_names.append(kwargs["StreamName"])
1✔
751
        return kwargs["StreamName"]
1✔
752

753
    yield _create_stream
1✔
754

755
    for stream_name in stream_names:
1✔
756
        try:
1✔
757
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
758
        except Exception as e:
1✔
759
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
1✔
760

761

762
@pytest.fixture
1✔
763
def wait_for_stream_ready(aws_client):
1✔
764
    def _wait_for_stream_ready(stream_name: str):
1✔
765
        def is_stream_ready():
1✔
766
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
767
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
768
                "ACTIVE",
769
                "UPDATING",
770
            ]
771

772
        return poll_condition(is_stream_ready)
1✔
773

774
    return _wait_for_stream_ready
1✔
775

776

777
@pytest.fixture
1✔
778
def wait_for_delivery_stream_ready(aws_client):
1✔
779
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
780
        def is_stream_ready():
1✔
781
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
782
                DeliveryStreamName=delivery_stream_name
783
            )
784
            return (
1✔
785
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
786
                == "ACTIVE"
787
            )
788

789
        poll_condition(is_stream_ready)
1✔
790

791
    return _wait_for_stream_ready
1✔
792

793

794
@pytest.fixture
1✔
795
def wait_for_dynamodb_stream_ready(aws_client):
1✔
796
    def _wait_for_stream_ready(stream_arn: str, client=None):
1✔
797
        def is_stream_ready():
1✔
798
            ddb_client = client or aws_client.dynamodbstreams
1✔
799
            describe_stream_response = ddb_client.describe_stream(StreamArn=stream_arn)
1✔
800
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
801

802
        return poll_condition(is_stream_ready)
1✔
803

804
    return _wait_for_stream_ready
1✔
805

806

807
@pytest.fixture()
1✔
808
def kms_create_key(aws_client_factory):
1✔
809
    key_ids = []
1✔
810

811
    def _create_key(region_name: str = None, **kwargs):
1✔
812
        if "Description" not in kwargs:
1✔
813
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
814
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
815
            "KeyMetadata"
816
        ]
817
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
818
        return key_metadata
1✔
819

820
    yield _create_key
1✔
821

822
    for region_name, key_id in key_ids:
1✔
823
        try:
1✔
824
            # shortest amount of time you can schedule the deletion
825
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
826
                KeyId=key_id, PendingWindowInDays=7
827
            )
828
        except Exception as e:
1✔
829
            exception_message = str(e)
1✔
830
            # Some tests schedule their keys for deletion themselves.
831
            if (
1✔
832
                "KMSInvalidStateException" not in exception_message
833
                or "is pending deletion" not in exception_message
834
            ):
835
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
836

837

838
@pytest.fixture()
1✔
839
def kms_replicate_key(aws_client_factory):
1✔
840
    key_ids = []
1✔
841

842
    def _replicate_key(region_from=None, **kwargs):
1✔
843
        region_to = kwargs.get("ReplicaRegion")
1✔
844
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
845
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
846

847
    yield _replicate_key
1✔
848

849
    for region_to, key_id in key_ids:
1✔
850
        try:
1✔
851
            # shortest amount of time you can schedule the deletion
852
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
853
                KeyId=key_id, PendingWindowInDays=7
854
            )
855
        except Exception as e:
×
856
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
857

858

859
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
860
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
861
# to make sure that we clean up aliases before keys get cleaned up.
862
@pytest.fixture()
1✔
863
def kms_create_alias(kms_create_key, aws_client):
1✔
864
    aliases = []
1✔
865

866
    def _create_alias(**kwargs):
1✔
867
        if "AliasName" not in kwargs:
1✔
868
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
869
        if "TargetKeyId" not in kwargs:
1✔
870
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
871

872
        aws_client.kms.create_alias(**kwargs)
1✔
873
        aliases.append(kwargs["AliasName"])
1✔
874
        return kwargs["AliasName"]
1✔
875

876
    yield _create_alias
1✔
877

878
    for alias in aliases:
1✔
879
        try:
1✔
880
            aws_client.kms.delete_alias(AliasName=alias)
1✔
881
        except Exception as e:
1✔
882
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
883

884

885
@pytest.fixture()
1✔
886
def kms_create_grant(kms_create_key, aws_client):
1✔
887
    grants = []
1✔
888

889
    def _create_grant(**kwargs):
1✔
890
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
891
        # but some GranteePrincipal is required to create a grant.
892
        GRANTEE_PRINCIPAL_ARN = (
1✔
893
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
894
        )
895

896
        if "Operations" not in kwargs:
1✔
897
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
898
        if "GranteePrincipal" not in kwargs:
1✔
899
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
900
        if "KeyId" not in kwargs:
1✔
901
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
902

903
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
904
        grants.append((grant_id, kwargs["KeyId"]))
1✔
905
        return grant_id, kwargs["KeyId"]
1✔
906

907
    yield _create_grant
1✔
908

909
    for grant_id, key_id in grants:
1✔
910
        try:
1✔
911
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
912
        except Exception as e:
×
913
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
914

915

916
@pytest.fixture
1✔
917
def kms_key(kms_create_key):
1✔
918
    return kms_create_key()
1✔
919

920

921
@pytest.fixture
1✔
922
def kms_grant_and_key(kms_key, aws_client):
1✔
923
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
924

925
    return [
1✔
926
        aws_client.kms.create_grant(
927
            KeyId=kms_key["KeyId"],
928
            GranteePrincipal=user_arn,
929
            Operations=["Decrypt", "Encrypt"],
930
        ),
931
        kms_key,
932
    ]
933

934

935
@pytest.fixture
1✔
936
def opensearch_wait_for_cluster(aws_client):
1✔
937
    def _wait_for_cluster(domain_name: str):
1✔
938
        def finished_processing():
1✔
939
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
940
            return status["DomainProcessingStatus"] == "Active" and "Endpoint" in status
1✔
941

942
        assert poll_condition(
1✔
943
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
944
        ), f"could not start domain: {domain_name}"
945

946
    return _wait_for_cluster
1✔
947

948

949
@pytest.fixture
1✔
950
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
951
    domains = []
1✔
952

953
    def factory(**kwargs) -> str:
1✔
954
        if "DomainName" not in kwargs:
1✔
955
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
956

957
        aws_client.opensearch.create_domain(**kwargs)
1✔
958

959
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
960

961
        domains.append(kwargs["DomainName"])
1✔
962
        return kwargs["DomainName"]
1✔
963

964
    yield factory
1✔
965

966
    # cleanup
967
    for domain in domains:
1✔
968
        try:
1✔
969
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
970
        except Exception as e:
×
971
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
972

973

974
@pytest.fixture
1✔
975
def opensearch_domain(opensearch_create_domain) -> str:
1✔
976
    return opensearch_create_domain()
1✔
977

978

979
@pytest.fixture
1✔
980
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
981
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
982
    assert "Endpoint" in status
1✔
983
    return f"https://{status['Endpoint']}"
1✔
984

985

986
@pytest.fixture
1✔
987
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
988
    document = {
1✔
989
        "first_name": "Boba",
990
        "last_name": "Fett",
991
        "age": 41,
992
        "about": "I'm just a simple man, trying to make my way in the universe.",
993
        "interests": ["mandalorian armor", "tusken culture"],
994
    }
995
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
996
    response = requests.put(
1✔
997
        document_path,
998
        data=json.dumps(document),
999
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
1000
    )
1001
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1002
    return document_path
1✔
1003

1004

1005
# Cleanup fixtures
1006
@pytest.fixture
1✔
1007
def cleanup_stacks(aws_client):
1✔
1008
    def _cleanup_stacks(stacks: list[str]) -> None:
1✔
1009
        stacks = ensure_list(stacks)
1✔
1010
        for stack in stacks:
1✔
1011
            try:
1✔
1012
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1013
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1014
            except Exception:
×
1015
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1016

1017
    return _cleanup_stacks
1✔
1018

1019

1020
@pytest.fixture
1✔
1021
def cleanup_changesets(aws_client):
1✔
1022
    def _cleanup_changesets(changesets: list[str]) -> None:
1✔
1023
        changesets = ensure_list(changesets)
1✔
1024
        for cs in changesets:
1✔
1025
            try:
1✔
1026
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1027
            except Exception:
1✔
1028
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1029

1030
    return _cleanup_changesets
1✔
1031

1032

1033
# Helpers for Cfn
1034

1035

1036
# TODO: exports(!)
1037
@dataclasses.dataclass(frozen=True)
1✔
1038
class DeployResult:
1✔
1039
    change_set_id: str
1✔
1040
    stack_id: str
1✔
1041
    stack_name: str
1✔
1042
    change_set_name: str
1✔
1043
    outputs: dict[str, str]
1✔
1044

1045
    destroy: Callable[[], None]
1✔
1046

1047

1048
class StackDeployError(Exception):
1✔
1049
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1050
        self.describe_result = describe_res
1✔
1051
        self.events = events
1✔
1052

1053
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1054
        if config.CFN_VERBOSE_ERRORS:
1✔
1055
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1056
        else:
1057
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1058

1059
        super().__init__(msg)
1✔
1060

1061
    def format_events(self, events: list[dict]) -> str:
1✔
1062
        formatted_events = []
1✔
1063

1064
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1065
        for event in chronological_events:
1✔
1066
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1067
                formatted_events.append(self.format_event(event))
1✔
1068

1069
        return "\n".join(formatted_events)
1✔
1070

1071
    @staticmethod
1✔
1072
    def format_event(event: dict) -> str:
1✔
1073
        if reason := event.get("ResourceStatusReason"):
1✔
1074
            reason = reason.replace("\n", "; ")
1✔
1075
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1076
        else:
1077
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
1✔
1078

1079

1080
@pytest.fixture
1✔
1081
def deploy_cfn_template(
1✔
1082
    aws_client: ServiceLevelClientFactory,
1083
):
1084
    state: list[tuple[str, Callable]] = []
1✔
1085

1086
    def _deploy(
1✔
1087
        *,
1088
        is_update: Optional[bool] = False,
1089
        stack_name: Optional[str] = None,
1090
        change_set_name: Optional[str] = None,
1091
        template: Optional[str] = None,
1092
        template_path: Optional[str | os.PathLike] = None,
1093
        template_mapping: Optional[dict[str, Any]] = None,
1094
        parameters: Optional[dict[str, str]] = None,
1095
        role_arn: Optional[str] = None,
1096
        max_wait: Optional[int] = None,
1097
        delay_between_polls: Optional[int] = 2,
1098
        custom_aws_client: Optional[ServiceLevelClientFactory] = None,
1099
        raw_parameters: Optional[list[Parameter]] = None,
1100
    ) -> DeployResult:
1101
        if is_update:
1✔
1102
            assert stack_name
1✔
1103
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1104
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1105

1106
        if max_wait is None:
1✔
1107
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1108

1109
        if template_path is not None:
1✔
1110
            template = load_template_file(template_path)
1✔
1111
            if template is None:
1✔
1112
                raise RuntimeError(f"Could not find file {os.path.realpath(template_path)}")
×
1113
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1114

1115
        kwargs = CreateChangeSetInput(
1✔
1116
            StackName=stack_name,
1117
            ChangeSetName=change_set_name,
1118
            TemplateBody=template_rendered,
1119
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1120
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1121
        )
1122
        kwargs["Parameters"] = []
1✔
1123
        if parameters:
1✔
1124
            kwargs["Parameters"] = [
1✔
1125
                Parameter(ParameterKey=k, ParameterValue=v) for (k, v) in parameters.items()
1126
            ]
1127
        elif raw_parameters:
1✔
1128
            kwargs["Parameters"] = raw_parameters
×
1129

1130
        if role_arn is not None:
1✔
1131
            kwargs["RoleARN"] = role_arn
×
1132

1133
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1134

1135
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1136

1137
        change_set_id = response["Id"]
1✔
1138
        stack_id = response["StackId"]
1✔
1139

1140
        cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1141
            ChangeSetName=change_set_id
1142
        )
1143
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1144
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1145
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1146
        )
1147

1148
        try:
1✔
1149
            stack_waiter.wait(
1✔
1150
                StackName=stack_id,
1151
                WaiterConfig={
1152
                    "Delay": delay_between_polls,
1153
                    "MaxAttempts": max_wait / delay_between_polls,
1154
                },
1155
            )
1156
        except botocore.exceptions.WaiterError as e:
1✔
1157
            raise StackDeployError(
1✔
1158
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1159
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1160
                    "StackEvents"
1161
                ],
1162
            ) from e
1163

1164
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1165
            "Stacks"
1166
        ][0]
1167
        outputs = describe_stack_res.get("Outputs", [])
1✔
1168

1169
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1170

1171
        def _destroy_stack():
1✔
1172
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1173
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1174
                StackName=stack_id,
1175
                WaiterConfig={
1176
                    "Delay": delay_between_polls,
1177
                    "MaxAttempts": max_wait / delay_between_polls,
1178
                },
1179
            )
1180

1181
        state.append((stack_id, _destroy_stack))
1✔
1182

1183
        return DeployResult(
1✔
1184
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1185
        )
1186

1187
    yield _deploy
1✔
1188

1189
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1190
    for stack_id, teardown in state[::-1]:
1✔
1191
        try:
1✔
1192
            teardown()
1✔
1193
        except Exception as e:
1✔
1194
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1195

1196

1197
@pytest.fixture
1✔
1198
def is_change_set_created_and_available(aws_client):
1✔
1199
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1200
        def _inner():
1✔
1201
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1202
            return (
1✔
1203
                # TODO: CREATE_FAILED should also not lead to further retries
1204
                change_set.get("Status") == "CREATE_COMPLETE"
1205
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1206
            )
1207

1208
        return _inner
1✔
1209

1210
    return _is_change_set_created_and_available
1✔
1211

1212

1213
@pytest.fixture
1✔
1214
def is_change_set_failed_and_unavailable(aws_client):
1✔
1215
    def _is_change_set_created_and_available(change_set_id: str):
×
1216
        def _inner():
×
1217
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1218
            return (
×
1219
                # TODO: CREATE_FAILED should also not lead to further retries
1220
                change_set.get("Status") == "FAILED"
1221
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1222
            )
1223

1224
        return _inner
×
1225

1226
    return _is_change_set_created_and_available
×
1227

1228

1229
@pytest.fixture
1✔
1230
def is_stack_created(aws_client):
1✔
1231
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1232

1233

1234
@pytest.fixture
1✔
1235
def is_stack_updated(aws_client):
1✔
1236
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1237

1238

1239
@pytest.fixture
1✔
1240
def is_stack_deleted(aws_client):
1✔
1241
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1242

1243

1244
def _has_stack_status(cfn_client, statuses: list[str]):
1✔
1245
    def _has_status(stack_id: str):
1✔
1246
        def _inner():
1✔
1247
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1248
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1249
            return s.get("StackStatus") in statuses
1✔
1250

1251
        return _inner
1✔
1252

1253
    return _has_status
1✔
1254

1255

1256
@pytest.fixture
1✔
1257
def is_change_set_finished(aws_client):
1✔
1258
    def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):
1✔
UNCOV
1259
        def _inner():
×
UNCOV
1260
            kwargs = {"ChangeSetName": change_set_id}
×
UNCOV
1261
            if stack_name:
×
1262
                kwargs["StackName"] = stack_name
×
1263

UNCOV
1264
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
×
1265

UNCOV
1266
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
×
1267
                LOG.warning("Change set failed")
×
1268
                raise ShortCircuitWaitException()
×
1269

UNCOV
1270
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
×
1271

UNCOV
1272
        return _inner
×
1273

1274
    return _is_change_set_finished
1✔
1275

1276

1277
@pytest.fixture
1✔
1278
def wait_until_lambda_ready(aws_client):
1✔
1279
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1280
        client = client or aws_client.lambda_
1✔
1281

1282
        def _is_not_pending():
1✔
1283
            kwargs = {}
1✔
1284
            if qualifier:
1✔
1285
                kwargs["Qualifier"] = qualifier
×
1286
            try:
1✔
1287
                result = (
1✔
1288
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1289
                    != "Pending"
1290
                )
1291
                LOG.debug("lambda state result: result=%s", result)
1✔
1292
                return result
1✔
1293
            except Exception as e:
×
1294
                LOG.error(e)
×
1295
                raise
×
1296

1297
        wait_until(_is_not_pending)
1✔
1298

1299
    return _wait_until_ready
1✔
1300

1301

1302
role_assume_policy = """
1✔
1303
{
1304
  "Version": "2012-10-17",
1305
  "Statement": [
1306
    {
1307
      "Effect": "Allow",
1308
      "Principal": {
1309
        "Service": "lambda.amazonaws.com"
1310
      },
1311
      "Action": "sts:AssumeRole"
1312
    }
1313
  ]
1314
}
1315
""".strip()
1316

1317
role_policy = """
1✔
1318
{
1319
    "Version": "2012-10-17",
1320
    "Statement": [
1321
        {
1322
            "Effect": "Allow",
1323
            "Action": [
1324
                "logs:CreateLogGroup",
1325
                "logs:CreateLogStream",
1326
                "logs:PutLogEvents"
1327
            ],
1328
            "Resource": [
1329
                "*"
1330
            ]
1331
        }
1332
    ]
1333
}
1334
""".strip()
1335

1336

1337
@pytest.fixture
1✔
1338
def create_lambda_function_aws(aws_client):
1✔
1339
    lambda_arns = []
1✔
1340

1341
    def _create_lambda_function(**kwargs):
1✔
1342
        def _create_function():
1✔
1343
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1344
            lambda_arns.append(resp["FunctionArn"])
1✔
1345

1346
            def _is_not_pending():
1✔
1347
                try:
1✔
1348
                    result = (
1✔
1349
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1350
                            "Configuration"
1351
                        ]["State"]
1352
                        != "Pending"
1353
                    )
1354
                    return result
1✔
1355
                except Exception as e:
×
1356
                    LOG.error(e)
×
1357
                    raise
×
1358

1359
            wait_until(_is_not_pending)
1✔
1360
            return resp
1✔
1361

1362
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1363
        # localstack should normally not require the retries and will just continue here
1364
        return retry(_create_function, retries=3, sleep=4)
1✔
1365

1366
    yield _create_lambda_function
1✔
1367

1368
    for arn in lambda_arns:
1✔
1369
        try:
1✔
1370
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1371
        except Exception:
×
1372
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1373

1374

1375
@pytest.fixture
1✔
1376
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1377
    lambda_arns_and_clients = []
1✔
1378
    log_groups = []
1✔
1379
    lambda_client = aws_client.lambda_
1✔
1380
    logs_client = aws_client.logs
1✔
1381
    s3_client = aws_client.s3
1✔
1382

1383
    def _create_lambda_function(*args, **kwargs):
1✔
1384
        client = kwargs.get("client") or lambda_client
1✔
1385
        kwargs["client"] = client
1✔
1386
        kwargs["s3_client"] = s3_client
1✔
1387
        func_name = kwargs.get("func_name")
1✔
1388
        assert func_name
1✔
1389
        del kwargs["func_name"]
1✔
1390

1391
        if not kwargs.get("role"):
1✔
1392
            kwargs["role"] = lambda_su_role
1✔
1393

1394
        def _create_function():
1✔
1395
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1396
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1397
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1398
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1399
            log_groups.append(log_group_name)
1✔
1400
            return resp
1✔
1401

1402
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1403
        # localstack should normally not require the retries and will just continue here
1404
        return retry(_create_function, retries=3, sleep=4)
1✔
1405

1406
    yield _create_lambda_function
1✔
1407

1408
    for arn, client in lambda_arns_and_clients:
1✔
1409
        try:
1✔
1410
            client.delete_function(FunctionName=arn)
1✔
1411
        except Exception:
1✔
1412
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1413

1414
    for log_group_name in log_groups:
1✔
1415
        try:
1✔
1416
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1417
        except Exception:
1✔
1418
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1419

1420

1421
@pytest.fixture
1✔
1422
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1423
    from localstack.aws.api.lambda_ import Runtime
1✔
1424

1425
    lambda_client = aws_client.lambda_
1✔
1426
    handler_code = textwrap.dedent(
1✔
1427
        """
1428
    import json
1429
    import os
1430

1431

1432
    def make_response(body: dict, status_code: int = 200):
1433
        return {
1434
            "statusCode": status_code,
1435
            "headers": {"Content-Type": "application/json"},
1436
            "body": body,
1437
        }
1438

1439

1440
    def trim_headers(headers):
1441
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1442
            return headers
1443
        return {
1444
            key: value for key, value in headers.items()
1445
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1446
        }
1447

1448

1449
    def handler(event, context):
1450
        print(json.dumps(event))
1451
        response = {
1452
            "args": event.get("queryStringParameters", {}),
1453
            "data": event.get("body", ""),
1454
            "domain": event["requestContext"].get("domainName", ""),
1455
            "headers": trim_headers(event.get("headers", {})),
1456
            "method": event["requestContext"]["http"].get("method", ""),
1457
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1458
            "path": event["requestContext"]["http"].get("path", ""),
1459
        }
1460
        return make_response(response)"""
1461
    )
1462

1463
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1464
        """Creates a server that will echo any request. Any request will be returned with the
1465
        following format. Any unset values will have those defaults.
1466
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1467
        order to create easier Snapshot testing. Default: `False`
1468
        {
1469
          "args": {},
1470
          "headers": {},
1471
          "data": "",
1472
          "method": "",
1473
          "domain": "",
1474
          "origin": "",
1475
          "path": ""
1476
        }"""
1477
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1478
        func_name = f"echo-http-{short_uid()}"
1✔
1479
        create_lambda_function(
1✔
1480
            func_name=func_name,
1481
            zip_file=zip_file,
1482
            runtime=Runtime.python3_12,
1483
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1484
        )
1485
        url_response = lambda_client.create_function_url_config(
1✔
1486
            FunctionName=func_name, AuthType="NONE"
1487
        )
1488
        aws_client.lambda_.add_permission(
1✔
1489
            FunctionName=func_name,
1490
            StatementId="urlPermission",
1491
            Action="lambda:InvokeFunctionUrl",
1492
            Principal="*",
1493
            FunctionUrlAuthType="NONE",
1494
        )
1495
        return url_response["FunctionUrl"]
1✔
1496

1497
    yield _create_echo_http_server
1✔
1498

1499

1500
@pytest.fixture
1✔
1501
def create_event_source_mapping(aws_client):
1✔
1502
    uuids = []
1✔
1503

1504
    def _create_event_source_mapping(*args, **kwargs):
1✔
1505
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1506
        uuids.append(response["UUID"])
1✔
1507
        return response
1✔
1508

1509
    yield _create_event_source_mapping
1✔
1510

1511
    for uuid in uuids:
1✔
1512
        try:
1✔
1513
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1514
        except Exception:
×
1515
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1516

1517

1518
@pytest.fixture
1✔
1519
def check_lambda_logs(aws_client):
1✔
1520
    def _check_logs(func_name: str, expected_lines: list[str] = None) -> list[str]:
1✔
1521
        if not expected_lines:
1✔
1522
            expected_lines = []
×
1523
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1524
        log_messages = [e["message"] for e in log_events]
1✔
1525
        for line in expected_lines:
1✔
1526
            if ".*" in line:
1✔
1527
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1528
                if any(found):
1✔
1529
                    continue
1✔
1530
            assert line in log_messages
×
1531
        return log_messages
1✔
1532

1533
    return _check_logs
1✔
1534

1535

1536
@pytest.fixture
1✔
1537
def create_policy(aws_client):
1✔
1538
    policy_arns = []
1✔
1539

1540
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1541
        iam_client = iam_client or aws_client.iam
1✔
1542
        if "PolicyName" not in kwargs:
1✔
1543
            kwargs["PolicyName"] = f"policy-{short_uid()}"
1✔
1544
        response = iam_client.create_policy(*args, **kwargs)
1✔
1545
        policy_arn = response["Policy"]["Arn"]
1✔
1546
        policy_arns.append((policy_arn, iam_client))
1✔
1547
        return response
1✔
1548

1549
    yield _create_policy
1✔
1550

1551
    for policy_arn, iam_client in policy_arns:
1✔
1552
        try:
1✔
1553
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1554
        except Exception:
1✔
1555
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1556

1557

1558
@pytest.fixture
1✔
1559
def create_user(aws_client):
1✔
1560
    usernames = []
1✔
1561

1562
    def _create_user(**kwargs):
1✔
1563
        if "UserName" not in kwargs:
1✔
1564
            kwargs["UserName"] = f"user-{short_uid()}"
×
1565
        response = aws_client.iam.create_user(**kwargs)
1✔
1566
        usernames.append(response["User"]["UserName"])
1✔
1567
        return response
1✔
1568

1569
    yield _create_user
1✔
1570

1571
    for username in usernames:
1✔
1572
        try:
1✔
1573
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1574
        except ClientError as e:
1✔
1575
            LOG.debug(
1✔
1576
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1577
            )
1578
            continue
1✔
1579

1580
        for inline_policy in inline_policies:
1✔
1581
            try:
1✔
1582
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1583
            except Exception:
×
1584
                LOG.debug(
×
1585
                    "Could not delete user policy '%s' from '%s' during cleanup",
1586
                    inline_policy,
1587
                    username,
1588
                )
1589
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1590
            "AttachedPolicies"
1591
        ]
1592
        for attached_policy in attached_policies:
1✔
1593
            try:
1✔
1594
                aws_client.iam.detach_user_policy(
1✔
1595
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1596
                )
1597
            except Exception:
1✔
1598
                LOG.debug(
1✔
1599
                    "Error detaching policy '%s' from user '%s'",
1600
                    attached_policy["PolicyArn"],
1601
                    username,
1602
                )
1603
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1604
        for access_key in access_keys:
1✔
1605
            try:
1✔
1606
                aws_client.iam.delete_access_key(
1✔
1607
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1608
                )
1609
            except Exception:
×
1610
                LOG.debug(
×
1611
                    "Error deleting access key '%s' from user '%s'",
1612
                    access_key["AccessKeyId"],
1613
                    username,
1614
                )
1615

1616
        try:
1✔
1617
            aws_client.iam.delete_user(UserName=username)
1✔
1618
        except Exception as e:
1✔
1619
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1620

1621

1622
@pytest.fixture
1✔
1623
def wait_and_assume_role(aws_client):
1✔
1624
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1625
        if not session_name:
1✔
1626
            session_name = f"session-{short_uid()}"
1✔
1627

1628
        def assume_role():
1✔
1629
            return aws_client.sts.assume_role(
1✔
1630
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1631
            )["Credentials"]
1632

1633
        # need to retry a couple of times before we are allowed to assume this role in AWS
1634
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1635
        return keys
1✔
1636

1637
    return _wait_and_assume_role
1✔
1638

1639

1640
@pytest.fixture
1✔
1641
def create_role(aws_client):
1✔
1642
    role_names = []
1✔
1643

1644
    def _create_role(iam_client=None, **kwargs):
1✔
1645
        if not kwargs.get("RoleName"):
1✔
1646
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1647
        iam_client = iam_client or aws_client.iam
1✔
1648
        result = iam_client.create_role(**kwargs)
1✔
1649
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1650
        return result
1✔
1651

1652
    yield _create_role
1✔
1653

1654
    for role_name, iam_client in role_names:
1✔
1655
        # detach policies
1656
        try:
1✔
1657
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1658
                "AttachedPolicies"
1659
            ]
1660
        except ClientError as e:
1✔
1661
            LOG.debug(
1✔
1662
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1663
                e,
1664
                role_name,
1665
            )
1666
            continue
1✔
1667
        for attached_policy in attached_policies:
1✔
1668
            try:
1✔
1669
                iam_client.detach_role_policy(
1✔
1670
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1671
                )
1672
            except Exception:
×
1673
                LOG.debug(
×
1674
                    "Could not detach role policy '%s' from '%s' during cleanup",
1675
                    attached_policy["PolicyArn"],
1676
                    role_name,
1677
                )
1678
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1679
        for role_policy in role_policies:
1✔
1680
            try:
1✔
1681
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1682
            except Exception:
×
1683
                LOG.debug(
×
1684
                    "Could not delete role policy '%s' from '%s' during cleanup",
1685
                    role_policy,
1686
                    role_name,
1687
                )
1688
        try:
1✔
1689
            iam_client.delete_role(RoleName=role_name)
1✔
1690
        except Exception:
×
1691
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1692

1693

1694
@pytest.fixture
1✔
1695
def create_parameter(aws_client):
1✔
1696
    params = []
1✔
1697

1698
    def _create_parameter(**kwargs):
1✔
1699
        params.append(kwargs["Name"])
1✔
1700
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1701

1702
    yield _create_parameter
1✔
1703

1704
    for param in params:
1✔
1705
        aws_client.ssm.delete_parameter(Name=param)
1✔
1706

1707

1708
@pytest.fixture
1✔
1709
def create_secret(aws_client):
1✔
1710
    items = []
1✔
1711

1712
    def _create_parameter(**kwargs):
1✔
1713
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1714
        items.append(create_response["ARN"])
1✔
1715
        return create_response
1✔
1716

1717
    yield _create_parameter
1✔
1718

1719
    for item in items:
1✔
1720
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1721

1722

1723
# TODO Figure out how to make cert creation tests pass against AWS.
1724
#
1725
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1726
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1727
#
1728
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1729
# by adding some CNAME records as requested by ASW in response to a certificate request.
1730
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1731
#
1732
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1733
# created by openssl etc. Not sure if there are other issues after that.
1734
#
1735
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1736
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1737
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1738
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1739
# pre-created certificates we would have to use specific domain names instead of random ones.
1740
@pytest.fixture
1✔
1741
def acm_request_certificate(aws_client_factory):
1✔
1742
    certificate_arns = []
1✔
1743

1744
    def factory(**kwargs) -> str:
1✔
1745
        if "DomainName" not in kwargs:
1✔
1746
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1747

1748
        region_name = kwargs.pop("region_name", None)
1✔
1749
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1750

1751
        response = acm_client.request_certificate(**kwargs)
1✔
1752
        created_certificate_arn = response["CertificateArn"]
1✔
1753
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1754
        return response
1✔
1755

1756
    yield factory
1✔
1757

1758
    # cleanup
1759
    for certificate_arn, region_name in certificate_arns:
1✔
1760
        try:
1✔
1761
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1762
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1763
        except Exception as e:
×
1764
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1765

1766

1767
role_policy_su = {
1✔
1768
    "Version": "2012-10-17",
1769
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1770
}
1771

1772

1773
@pytest.fixture(scope="session")
1✔
1774
def lambda_su_role(aws_client):
1✔
1775
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1776
    role = aws_client.iam.create_role(
1✔
1777
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1778
    )["Role"]
1779
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1780
    policy_arn = aws_client.iam.create_policy(
1✔
1781
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1782
    )["Policy"]["Arn"]
1783
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1784

1785
    if is_aws_cloud():  # dirty but necessary
1✔
1786
        time.sleep(10)
×
1787

1788
    yield role["Arn"]
1✔
1789

1790
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1791
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1792
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1793

1794

1795
@pytest.fixture
1✔
1796
def create_iam_role_and_attach_policy(aws_client):
1✔
1797
    """
1798
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1799

1800
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1801
    """
1802
    roles = []
×
1803

1804
    def _inner(**kwargs: dict[str, any]) -> str:
×
1805
        """
1806
        :param dict RoleDefinition: role definition document
1807
        :param str PolicyArn: policy ARN
1808
        :param str RoleName: role name (autogenerated if omitted)
1809
        :return: role ARN
1810
        """
1811
        if "RoleName" not in kwargs:
×
1812
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1813

1814
        role = kwargs["RoleName"]
×
1815
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1816

1817
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1818
        role_arn = result["Role"]["Arn"]
×
1819

1820
        policy_arn = kwargs["PolicyArn"]
×
1821
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1822

1823
        roles.append(role)
×
1824
        return role_arn
×
1825

1826
    yield _inner
×
1827

1828
    for role in roles:
×
1829
        try:
×
1830
            aws_client.iam.delete_role(RoleName=role)
×
1831
        except Exception as exc:
×
1832
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1833

1834

1835
@pytest.fixture
1✔
1836
def create_iam_role_with_policy(aws_client):
1✔
1837
    """
1838
    Fixture that creates an IAM role with given role definition and policy definition.
1839
    """
1840
    roles = {}
1✔
1841

1842
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1843
        """
1844
        :param dict RoleDefinition: role definition document
1845
        :param dict PolicyDefinition: policy definition document
1846
        :param str PolicyName: policy name (autogenerated if omitted)
1847
        :param str RoleName: role name (autogenerated if omitted)
1848
        :return: role ARN
1849
        """
1850
        if "RoleName" not in kwargs:
1✔
1851
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1852
        role = kwargs["RoleName"]
1✔
1853
        if "PolicyName" not in kwargs:
1✔
1854
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1855
        policy = kwargs["PolicyName"]
1✔
1856
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1857

1858
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1859
        role_arn = result["Role"]["Arn"]
1✔
1860

1861
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1862
        aws_client.iam.put_role_policy(
1✔
1863
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1864
        )
1865
        roles[role] = policy
1✔
1866
        return role_arn
1✔
1867

1868
    yield _create_role_and_policy
1✔
1869

1870
    for role_name, policy_name in roles.items():
1✔
1871
        try:
1✔
1872
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1873
        except Exception as exc:
×
1874
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1875
        try:
1✔
1876
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1877
        except Exception as exc:
×
1878
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1879

1880

1881
@pytest.fixture
1✔
1882
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1883
    delivery_stream_names = []
1✔
1884

1885
    def _create_delivery_stream(**kwargs):
1✔
1886
        if "DeliveryStreamName" not in kwargs:
1✔
1887
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1888
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1889
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1890
        delivery_stream_names.append(delivery_stream_name)
1✔
1891
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1892
        return response
1✔
1893

1894
    yield _create_delivery_stream
1✔
1895

1896
    for delivery_stream_name in delivery_stream_names:
1✔
1897
        try:
1✔
1898
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1899
        except Exception:
×
1900
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1901

1902

1903
@pytest.fixture
1✔
1904
def ses_configuration_set(aws_client):
1✔
1905
    configuration_set_names = []
1✔
1906

1907
    def factory(name: str) -> None:
1✔
1908
        aws_client.ses.create_configuration_set(
1✔
1909
            ConfigurationSet={
1910
                "Name": name,
1911
            },
1912
        )
1913
        configuration_set_names.append(name)
1✔
1914

1915
    yield factory
1✔
1916

1917
    for configuration_set_name in configuration_set_names:
1✔
1918
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1919

1920

1921
@pytest.fixture
1✔
1922
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1923
    event_destinations = []
1✔
1924

1925
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1926
        aws_client.ses.create_configuration_set_event_destination(
1✔
1927
            ConfigurationSetName=config_set_name,
1928
            EventDestination={
1929
                "Name": event_destination_name,
1930
                "Enabled": True,
1931
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1932
                "SNSDestination": {
1933
                    "TopicARN": topic_arn,
1934
                },
1935
            },
1936
        )
1937
        event_destinations.append((config_set_name, event_destination_name))
1✔
1938

1939
    yield factory
1✔
1940

1941
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1942
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1943
            ConfigurationSetName=created_config_set_name,
1944
            EventDestinationName=created_event_destination_name,
1945
        )
1946

1947

1948
@pytest.fixture
1✔
1949
def ses_email_template(aws_client):
1✔
1950
    template_names = []
1✔
1951

1952
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1953
        aws_client.ses.create_template(
1✔
1954
            Template={
1955
                "TemplateName": name,
1956
                "SubjectPart": subject,
1957
                "TextPart": contents,
1958
            }
1959
        )
1960
        template_names.append(name)
1✔
1961

1962
    yield factory
1✔
1963

1964
    for template_name in template_names:
1✔
1965
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1966

1967

1968
@pytest.fixture
1✔
1969
def ses_verify_identity(aws_client):
1✔
1970
    identities = []
1✔
1971

1972
    def factory(email_address: str) -> None:
1✔
1973
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1974

1975
    yield factory
1✔
1976

1977
    for identity in identities:
1✔
1978
        aws_client.ses.delete_identity(Identity=identity)
×
1979

1980

1981
@pytest.fixture
1✔
1982
def setup_sender_email_address(ses_verify_identity):
1✔
1983
    """
1984
    If the test is running against AWS then assume the email address passed is already
1985
    verified, and passes the given email address through. Otherwise, it generates one random
1986
    email address and verify them.
1987
    """
1988

1989
    def inner(sender_email_address: Optional[str] = None) -> str:
1✔
1990
        if is_aws_cloud():
1✔
1991
            if sender_email_address is None:
×
1992
                raise ValueError(
×
1993
                    "sender_email_address must be specified to run this test against AWS"
1994
                )
1995
        else:
1996
            # overwrite the given parameters with localstack specific ones
1997
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
1998
            ses_verify_identity(sender_email_address)
1✔
1999

2000
        return sender_email_address
1✔
2001

2002
    return inner
1✔
2003

2004

2005
@pytest.fixture
1✔
2006
def ec2_create_security_group(aws_client):
1✔
2007
    ec2_sgs = []
1✔
2008

2009
    def factory(ports=None, ip_protocol: str = "tcp", **kwargs):
1✔
2010
        """
2011
        Create the target group and authorize the security group ingress.
2012
        :param ports: list of ports to be authorized for the ingress rule.
2013
        :param ip_protocol: the ip protocol for the permissions (tcp by default)
2014
        """
2015
        if "GroupName" not in kwargs:
1✔
2016
            # FIXME: This will fail against AWS since the sg prefix is not valid for GroupName
2017
            # > "Group names may not be in the format sg-*".
2018
            kwargs["GroupName"] = f"sg-{short_uid()}"
×
2019
        # Making sure the call to CreateSecurityGroup gets the right arguments
2020
        _args = select_from_typed_dict(CreateSecurityGroupRequest, kwargs)
1✔
2021
        security_group = aws_client.ec2.create_security_group(**_args)
1✔
2022
        security_group_id = security_group["GroupId"]
1✔
2023

2024
        # FIXME: If 'ports' is None or an empty list, authorize_security_group_ingress will fail due to missing IpPermissions.
2025
        # Must ensure ports are explicitly provided or skip authorization entirely if not required.
2026
        permissions = [
1✔
2027
            {
2028
                "FromPort": port,
2029
                "IpProtocol": ip_protocol,
2030
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2031
                "ToPort": port,
2032
            }
2033
            for port in ports or []
2034
        ]
2035
        aws_client.ec2.authorize_security_group_ingress(
1✔
2036
            GroupId=security_group_id,
2037
            IpPermissions=permissions,
2038
        )
2039

2040
        ec2_sgs.append(security_group_id)
1✔
2041
        return security_group
1✔
2042

2043
    yield factory
1✔
2044

2045
    for sg_group_id in ec2_sgs:
1✔
2046
        try:
1✔
2047
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2048
        except Exception as e:
×
2049
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2050

2051

2052
@pytest.fixture
1✔
2053
def ec2_create_vpc_endpoint(aws_client):
1✔
2054
    vpc_endpoints = []
1✔
2055

2056
    def _create(**kwargs: Unpack[CreateVpcEndpointRequest]) -> VpcEndpoint:
1✔
2057
        endpoint = aws_client.ec2.create_vpc_endpoint(**kwargs)
1✔
2058
        endpoint_id = endpoint["VpcEndpoint"]["VpcEndpointId"]
1✔
2059
        vpc_endpoints.append(endpoint_id)
1✔
2060

2061
        def _check_available() -> VpcEndpoint:
1✔
2062
            result = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=[endpoint_id])
1✔
2063
            _endpoint_details = result["VpcEndpoints"][0]
1✔
2064
            assert _endpoint_details["State"] == "available"
1✔
2065

2066
            return _endpoint_details
1✔
2067

2068
        return retry(_check_available, retries=30, sleep=5 if is_aws_cloud() else 1)
1✔
2069

2070
    yield _create
1✔
2071

2072
    try:
1✔
2073
        aws_client.ec2.delete_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
1✔
2074
    except Exception as e:
×
2075
        LOG.error("Error cleaning up VPC endpoint: %s, %s", vpc_endpoints, e)
×
2076

2077
    def wait_for_endpoint_deleted():
1✔
2078
        try:
×
2079
            endpoints = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
×
2080
            assert len(endpoints["VpcEndpoints"]) == 0 or all(
×
2081
                endpoint["State"] == "Deleted" for endpoint in endpoints["VpcEndpoints"]
2082
            )
2083
        except botocore.exceptions.ClientError:
×
2084
            pass
×
2085

2086
    # the vpc can't be deleted if an endpoint exists
2087
    if is_aws_cloud():
1✔
2088
        retry(wait_for_endpoint_deleted, retries=30, sleep=10 if is_aws_cloud() else 1)
×
2089

2090

2091
@pytest.fixture
1✔
2092
def cleanups():
1✔
2093
    cleanup_fns = []
1✔
2094

2095
    yield cleanup_fns
1✔
2096

2097
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2098
        try:
1✔
2099
            cleanup_callback()
1✔
2100
        except Exception as e:
1✔
2101
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2102

2103

2104
@pytest.fixture(scope="session")
1✔
2105
def account_id(aws_client):
1✔
2106
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2107
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2108
    else:
2109
        return TEST_AWS_ACCOUNT_ID
×
2110

2111

2112
@pytest.fixture(scope="session")
1✔
2113
def region_name(aws_client):
1✔
2114
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2115
        return aws_client.sts.meta.region_name
1✔
2116
    else:
2117
        return TEST_AWS_REGION_NAME
×
2118

2119

2120
@pytest.fixture(scope="session")
1✔
2121
def partition(region_name):
1✔
2122
    return get_partition(region_name)
1✔
2123

2124

2125
@pytest.fixture(scope="session")
1✔
2126
def secondary_account_id(secondary_aws_client):
1✔
2127
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2128
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2129
    else:
2130
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2131

2132

2133
@pytest.fixture(scope="session")
1✔
2134
def secondary_region_name():
1✔
2135
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2136

2137

2138
@pytest.hookimpl
1✔
2139
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2140
    only_localstack = pytest.mark.skipif(
1✔
2141
        is_aws_cloud(),
2142
        reason="test only applicable if run against localstack",
2143
    )
2144
    for item in items:
1✔
2145
        for mark in item.iter_markers():
1✔
2146
            if mark.name.endswith("only_localstack"):
1✔
2147
                item.add_marker(only_localstack)
1✔
2148
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2149
            # add a marker that indicates that this test is snapshot validated
2150
            # if it uses the snapshot fixture -> allows selecting only snapshot
2151
            # validated tests in order to capture new snapshots for a whole
2152
            # test file with "-m snapshot_validated"
2153
            item.add_marker("snapshot_validated")
1✔
2154

2155

2156
@pytest.fixture
1✔
2157
def sample_stores() -> AccountRegionBundle:
1✔
2158
    class SampleStore(BaseStore):
1✔
2159
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2160
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2161
        region_specific_attr = LocalAttribute(default=list)
1✔
2162

2163
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2164

2165

2166
@pytest.fixture
1✔
2167
def create_rest_apigw(aws_client_factory):
1✔
2168
    rest_apis = []
1✔
2169
    retry_boto_config = None
1✔
2170
    if is_aws_cloud():
1✔
2171
        retry_boto_config = botocore.config.Config(
×
2172
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2173
            retries={"max_attempts": 10, "mode": "adaptive"}
2174
        )
2175

2176
    def _create_apigateway_function(**kwargs):
1✔
2177
        client_region_name = kwargs.pop("region_name", None)
1✔
2178
        apigateway_client = aws_client_factory(
1✔
2179
            region_name=client_region_name, config=retry_boto_config
2180
        ).apigateway
2181
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2182

2183
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2184
        api_id = response.get("id")
1✔
2185
        rest_apis.append((api_id, client_region_name))
1✔
2186

2187
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2188

2189
    yield _create_apigateway_function
1✔
2190

2191
    for rest_api_id, _client_region_name in rest_apis:
1✔
2192
        apigateway_client = aws_client_factory(
1✔
2193
            region_name=_client_region_name,
2194
            config=retry_boto_config,
2195
        ).apigateway
2196
        # First, retrieve the usage plans associated with the REST API
2197
        usage_plan_ids = []
1✔
2198
        usage_plans = apigateway_client.get_usage_plans()
1✔
2199
        for item in usage_plans.get("items", []):
1✔
2200
            api_stages = item.get("apiStages", [])
1✔
2201
            usage_plan_ids.extend(
1✔
2202
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2203
            )
2204

2205
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2206
        with contextlib.suppress(Exception):
1✔
2207
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2208

2209
        # finally delete the usage plans and the API Keys linked to it
2210
        for usage_plan_id in usage_plan_ids:
1✔
2211
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2212
            for key in usage_plan_keys.get("items", []):
1✔
2213
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2214
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2215

2216

2217
@pytest.fixture
1✔
2218
def create_rest_apigw_openapi(aws_client_factory):
1✔
2219
    rest_apis = []
×
2220

2221
    def _create_apigateway_function(**kwargs):
×
2222
        region_name = kwargs.pop("region_name", None)
×
2223
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2224

2225
        response = apigateway_client.import_rest_api(**kwargs)
×
2226
        api_id = response.get("id")
×
2227
        rest_apis.append((api_id, region_name))
×
2228
        return api_id, response
×
2229

2230
    yield _create_apigateway_function
×
2231

2232
    for rest_api_id, region_name in rest_apis:
×
2233
        with contextlib.suppress(Exception):
×
2234
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2235
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2236

2237

2238
@pytest.fixture
1✔
2239
def assert_host_customisation(monkeypatch):
1✔
2240
    localstack_host = "foo.bar"
1✔
2241
    monkeypatch.setattr(
1✔
2242
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2243
    )
2244

2245
    def asserter(
1✔
2246
        url: str,
2247
        *,
2248
        custom_host: Optional[str] = None,
2249
    ):
2250
        if custom_host is not None:
1✔
2251
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2252

2253
            assert localstack_host not in url
×
2254
        else:
2255
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2256

2257
    yield asserter
1✔
2258

2259

2260
@pytest.fixture
1✔
2261
def echo_http_server(httpserver: HTTPServer):
1✔
2262
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2263

2264
    def _echo(request: Request) -> Response:
1✔
2265
        request_json = None
1✔
2266
        if request.is_json:
1✔
2267
            with contextlib.suppress(ValueError):
1✔
2268
                request_json = json.loads(request.data)
1✔
2269
        result = {
1✔
2270
            "data": request.data or "{}",
2271
            "headers": dict(request.headers),
2272
            "url": request.url,
2273
            "method": request.method,
2274
            "json": request_json,
2275
        }
2276
        response_body = json.dumps(json_safe(result))
1✔
2277
        return Response(response_body, status=200)
1✔
2278

2279
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2280
    http_endpoint = httpserver.url_for("/")
1✔
2281

2282
    return http_endpoint
1✔
2283

2284

2285
@pytest.fixture
1✔
2286
def echo_http_server_post(echo_http_server):
1✔
2287
    """
2288
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2289
    """
2290
    if is_aws_cloud():
1✔
2291
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2292

2293
    return f"{echo_http_server}post"
1✔
2294

2295

2296
def create_policy_doc(effect: str, actions: list, resource=None) -> dict:
1✔
2297
    actions = ensure_list(actions)
1✔
2298
    resource = resource or "*"
1✔
2299
    return {
1✔
2300
        "Version": "2012-10-17",
2301
        "Statement": [
2302
            {
2303
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2304
                "Sid": f"s{short_uid()}",
2305
                "Effect": effect,
2306
                "Action": actions,
2307
                "Resource": resource,
2308
            }
2309
        ],
2310
    }
2311

2312

2313
@pytest.fixture
1✔
2314
def create_policy_generated_document(create_policy):
1✔
2315
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2316
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2317
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2318
        response = create_policy(
1✔
2319
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2320
        )
2321
        policy_arn = response["Policy"]["Arn"]
1✔
2322
        return policy_arn
1✔
2323

2324
    return _create_policy_with_doc
1✔
2325

2326

2327
@pytest.fixture
1✔
2328
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2329
    def _create_role_with_policy(
1✔
2330
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2331
    ):
2332
        iam_client = iam_client or aws_client.iam
1✔
2333

2334
        role_name = f"role-{short_uid()}"
1✔
2335
        result = create_role(
1✔
2336
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2337
        )
2338
        role_arn = result["Role"]["Arn"]
1✔
2339
        policy_name = f"p-{short_uid()}"
1✔
2340

2341
        if attach:
1✔
2342
            # create role and attach role policy
2343
            policy_arn = create_policy_generated_document(
1✔
2344
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2345
            )
2346
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2347
        else:
2348
            # put role policy
2349
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2350
            policy_document = json.dumps(policy_document)
1✔
2351
            iam_client.put_role_policy(
1✔
2352
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2353
            )
2354

2355
        return role_name, role_arn
1✔
2356

2357
    return _create_role_with_policy
1✔
2358

2359

2360
@pytest.fixture
1✔
2361
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2362
    def _create_user_with_policy(effect, actions, resource=None):
×
2363
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2364
        username = f"user-{short_uid()}"
×
2365
        create_user(UserName=username)
×
2366
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2367
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2368
        return username, keys
×
2369

2370
    return _create_user_with_policy
×
2371

2372

2373
@pytest.fixture()
1✔
2374
def register_extension(s3_bucket, aws_client):
1✔
2375
    cfn_client = aws_client.cloudformation
×
2376
    extensions_arns = []
×
2377

2378
    def _register(extension_name, extension_type, artifact_path):
×
2379
        bucket = s3_bucket
×
2380
        key = f"artifact-{short_uid()}"
×
2381

2382
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2383

2384
        register_response = cfn_client.register_type(
×
2385
            Type=extension_type,
2386
            TypeName=extension_name,
2387
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2388
        )
2389

2390
        registration_token = register_response["RegistrationToken"]
×
2391
        cfn_client.get_waiter("type_registration_complete").wait(
×
2392
            RegistrationToken=registration_token
2393
        )
2394

2395
        describe_response = cfn_client.describe_type_registration(
×
2396
            RegistrationToken=registration_token
2397
        )
2398

2399
        extensions_arns.append(describe_response["TypeArn"])
×
2400
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2401

2402
        return describe_response
×
2403

2404
    yield _register
×
2405

2406
    for arn in extensions_arns:
×
2407
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2408
        for v in versions:
×
2409
            try:
×
2410
                cfn_client.deregister_type(Arn=v["Arn"])
×
2411
            except Exception:
×
2412
                continue
×
2413
        cfn_client.deregister_type(Arn=arn)
×
2414

2415

2416
@pytest.fixture
1✔
2417
def hosted_zone(aws_client):
1✔
2418
    zone_ids = []
1✔
2419

2420
    def factory(**kwargs):
1✔
2421
        if "CallerReference" not in kwargs:
1✔
2422
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2423
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2424
        zone_id = response["HostedZone"]["Id"]
1✔
2425
        zone_ids.append(zone_id)
1✔
2426
        return response
1✔
2427

2428
    yield factory
1✔
2429

2430
    for zone_id in zone_ids[::-1]:
1✔
2431
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2432

2433

2434
@pytest.fixture
1✔
2435
def openapi_validate(monkeypatch):
1✔
2436
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2437
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2438

2439

2440
@pytest.fixture
1✔
2441
def set_resource_custom_id():
1✔
2442
    set_ids = []
1✔
2443

2444
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2445
        localstack_id_manager.set_custom_id(
1✔
2446
            resource_identifier=resource_identifier, custom_id=custom_id
2447
        )
2448
        set_ids.append(resource_identifier)
1✔
2449

2450
    yield _set_custom_id
1✔
2451

2452
    for resource_identifier in set_ids:
1✔
2453
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2454

2455

2456
###############################
2457
# Events (EventBridge) fixtures
2458
###############################
2459

2460

2461
@pytest.fixture
1✔
2462
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2463
    event_bus_names = []
1✔
2464

2465
    def _create_event_bus(**kwargs):
1✔
2466
        if "Name" not in kwargs:
1✔
2467
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2468

2469
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2470
        event_bus_names.append(kwargs["Name"])
1✔
2471
        return response
1✔
2472

2473
    yield _create_event_bus
1✔
2474

2475
    for event_bus_name in event_bus_names:
1✔
2476
        try:
1✔
2477
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2478
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2479

2480
            # Delete all rules for the current event bus
2481
            for rule in rules:
1✔
2482
                try:
1✔
2483
                    response = aws_client.events.list_targets_by_rule(
1✔
2484
                        Rule=rule, EventBusName=event_bus_name
2485
                    )
2486
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2487

2488
                    # Remove all targets for the current rule
2489
                    if targets:
1✔
2490
                        for target in targets:
1✔
2491
                            aws_client.events.remove_targets(
1✔
2492
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2493
                            )
2494

2495
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2496
                except Exception as e:
×
2497
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2498

2499
            # Delete archives for event bus
2500
            event_source_arn = (
1✔
2501
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2502
            )
2503
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2504
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2505
            for archive in archives:
1✔
2506
                try:
1✔
2507
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2508
                except Exception as e:
×
2509
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2510

2511
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2512
        except Exception as e:
1✔
2513
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2514

2515

2516
@pytest.fixture
1✔
2517
def events_put_rule(aws_client):
1✔
2518
    rules = []
1✔
2519

2520
    def _put_rule(**kwargs):
1✔
2521
        if "Name" not in kwargs:
1✔
2522
            kwargs["Name"] = f"rule-{short_uid()}"
×
2523

2524
        response = aws_client.events.put_rule(**kwargs)
1✔
2525
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2526
        return response
1✔
2527

2528
    yield _put_rule
1✔
2529

2530
    for rule, event_bus_name in rules:
1✔
2531
        try:
1✔
2532
            response = aws_client.events.list_targets_by_rule(
1✔
2533
                Rule=rule, EventBusName=event_bus_name
2534
            )
2535
            targets = [target["Id"] for target in response["Targets"]]
1✔
2536

2537
            # Remove all targets for the current rule
2538
            if targets:
1✔
2539
                for target in targets:
1✔
2540
                    aws_client.events.remove_targets(
1✔
2541
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2542
                    )
2543

2544
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2545
        except Exception as e:
1✔
2546
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2547

2548

2549
@pytest.fixture
1✔
2550
def events_create_rule(aws_client):
1✔
2551
    rules = []
1✔
2552

2553
    def _create_rule(**kwargs):
1✔
2554
        rule_name = kwargs["Name"]
1✔
2555
        bus_name = kwargs.get("EventBusName", "")
1✔
2556
        pattern = kwargs.get("EventPattern", {})
1✔
2557
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2558
        rule_arn = aws_client.events.put_rule(
1✔
2559
            Name=rule_name,
2560
            EventBusName=bus_name,
2561
            EventPattern=json.dumps(pattern),
2562
            ScheduleExpression=schedule,
2563
        )["RuleArn"]
2564
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2565
        return rule_arn
1✔
2566

2567
    yield _create_rule
1✔
2568

2569
    for rule in rules:
1✔
2570
        targets = aws_client.events.list_targets_by_rule(
1✔
2571
            Rule=rule["name"], EventBusName=rule["bus"]
2572
        )["Targets"]
2573

2574
        targetIds = [target["Id"] for target in targets]
1✔
2575
        if len(targetIds) > 0:
1✔
2576
            aws_client.events.remove_targets(
1✔
2577
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2578
            )
2579

2580
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2581

2582

2583
@pytest.fixture
1✔
2584
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2585
    """
2586
    Fixture that creates an SQS queue and sets it up as a target for EventBridge events.
2587
    """
2588
    queue_urls = []
1✔
2589

2590
    def _sqs_as_events_target(
1✔
2591
        queue_name: str | None = None, custom_aws_client=None
2592
    ) -> tuple[str, str]:
2593
        if not queue_name:
1✔
2594
            queue_name = f"tests-queue-{short_uid()}"
1✔
2595
        if custom_aws_client:
1✔
2596
            sqs_client = custom_aws_client.sqs
×
2597
        else:
2598
            sqs_client = aws_client.sqs
1✔
2599
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2600
        queue_urls.append((queue_url, sqs_client))
1✔
2601
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2602
        policy = {
1✔
2603
            "Version": "2012-10-17",
2604
            "Id": f"sqs-eventbridge-{short_uid()}",
2605
            "Statement": [
2606
                {
2607
                    "Sid": f"SendMessage-{short_uid()}",
2608
                    "Effect": "Allow",
2609
                    "Principal": {"Service": "events.amazonaws.com"},
2610
                    "Action": "sqs:SendMessage",
2611
                    "Resource": queue_arn,
2612
                }
2613
            ],
2614
        }
2615
        sqs_client.set_queue_attributes(
1✔
2616
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2617
        )
2618
        return queue_url, queue_arn, queue_name
1✔
2619

2620
    yield _sqs_as_events_target
1✔
2621

2622
    for queue_url, sqs_client in queue_urls:
1✔
2623
        try:
1✔
2624
            sqs_client.delete_queue(QueueUrl=queue_url)
1✔
2625
        except Exception as e:
×
2626
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2627

2628

2629
@pytest.fixture
1✔
2630
def create_role_event_bus_source_to_bus_target(create_iam_role_with_policy):
1✔
2631
    def _create_role_event_bus_to_bus():
1✔
2632
        assume_role_policy_document_bus_source_to_bus_target = {
1✔
2633
            "Version": "2012-10-17",
2634
            "Statement": [
2635
                {
2636
                    "Effect": "Allow",
2637
                    "Principal": {"Service": "events.amazonaws.com"},
2638
                    "Action": "sts:AssumeRole",
2639
                }
2640
            ],
2641
        }
2642

2643
        policy_document_bus_source_to_bus_target = {
1✔
2644
            "Version": "2012-10-17",
2645
            "Statement": [
2646
                {
2647
                    "Sid": "",
2648
                    "Effect": "Allow",
2649
                    "Action": "events:PutEvents",
2650
                    "Resource": "arn:aws:events:*:*:event-bus/*",
2651
                }
2652
            ],
2653
        }
2654

2655
        role_arn_bus_source_to_bus_target = create_iam_role_with_policy(
1✔
2656
            RoleDefinition=assume_role_policy_document_bus_source_to_bus_target,
2657
            PolicyDefinition=policy_document_bus_source_to_bus_target,
2658
        )
2659

2660
        return role_arn_bus_source_to_bus_target
1✔
2661

2662
    yield _create_role_event_bus_to_bus
1✔
2663

2664

2665
@pytest.fixture
1✔
2666
def get_primary_secondary_client(
1✔
2667
    aws_client_factory,
2668
    secondary_aws_client_factory,
2669
    region_name,
2670
    secondary_region_name,
2671
    account_id,
2672
    secondary_account_id,
2673
):
2674
    def _get_primary_secondary_clients(cross_scenario: str):
1✔
2675
        """
2676
        Returns primary and secondary AWS clients based on the cross-scenario.
2677
        :param cross_scenario: The scenario for cross-region or cross-account testing.
2678
                               Options: "region", "account", "region_account"
2679
                               account_region cross scenario is not supported by AWS
2680
        :return: A dictionary containing primary and secondary AWS clients, and their respective region and account IDs.
2681
        """
2682
        secondary_region = secondary_region_name
1✔
2683
        secondary_account = secondary_account_id
1✔
2684
        if cross_scenario not in ["region", "account", "region_account"]:
1✔
2685
            raise ValueError(f"cross_scenario {cross_scenario} not supported")
×
2686

2687
        primary_client = aws_client_factory(region_name=region_name)
1✔
2688

2689
        if cross_scenario == "region":
1✔
2690
            secondary_account = account_id
1✔
2691
            secondary_client = aws_client_factory(region_name=secondary_region_name)
1✔
2692

2693
        elif cross_scenario == "account":
1✔
2694
            secondary_region = region_name
1✔
2695
            secondary_client = secondary_aws_client_factory(region_name=region_name)
1✔
2696

2697
        elif cross_scenario == "region_account":
1✔
2698
            secondary_client = secondary_aws_client_factory(region_name=secondary_region)
1✔
2699

2700
        return {
1✔
2701
            "primary_aws_client": primary_client,
2702
            "secondary_aws_client": secondary_client,
2703
            "secondary_region_name": secondary_region,
2704
            "secondary_account_id": secondary_account,
2705
        }
2706

2707
    return _get_primary_secondary_clients
1✔
2708

2709

2710
@pytest.fixture
1✔
2711
def clean_up(
1✔
2712
    aws_client,
2713
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2714
    def _clean_up(
1✔
2715
        bus_name=None,
2716
        rule_name=None,
2717
        target_ids=None,
2718
        queue_url=None,
2719
        log_group_name=None,
2720
    ):
2721
        events_client = aws_client.events
1✔
2722
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2723
        if target_ids:
1✔
2724
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2725
            call_safe(
1✔
2726
                events_client.remove_targets,
2727
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2728
            )
2729
        if rule_name:
1✔
2730
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2731
        if bus_name:
1✔
2732
            call_safe(events_client.delete_event_bus, kwargs={"Name": bus_name})
×
2733
        if queue_url:
1✔
2734
            sqs_client = aws_client.sqs
×
2735
            call_safe(sqs_client.delete_queue, kwargs={"QueueUrl": queue_url})
×
2736
        if log_group_name:
1✔
2737
            logs_client = aws_client.logs
×
2738

2739
            def _delete_log_group():
×
2740
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2741
                for log_stream in log_streams["logStreams"]:
×
2742
                    logs_client.delete_log_stream(
×
2743
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2744
                    )
2745
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2746

2747
            call_safe(_delete_log_group)
×
2748

2749
    yield _clean_up
1✔
2750

2751

2752
@pytest.fixture(params=["dill", "jsonpickle"])
1✔
2753
def patch_default_encoder(request, monkeypatch):
1✔
2754
    backend = request.param
1✔
2755
    monkeypatch.setattr(config, "STATE_SERIALIZATION_BACKEND", backend)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc