• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 64e19109-9edf-4c7b-b161-ee843a26f61d

20 May 2025 12:52PM UTC coverage: 86.628% (-0.03%) from 86.655%
64e19109-9edf-4c7b-b161-ee843a26f61d

push

circleci

web-flow
Activate new GHA pipeline on PRs (#12648)

64477 of 74430 relevant lines covered (86.63%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.45
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
1✔
10

11
import botocore.auth
1✔
12
import botocore.config
1✔
13
import botocore.credentials
1✔
14
import botocore.session
1✔
15
import pytest
1✔
16
from _pytest.config import Config
1✔
17
from _pytest.nodes import Item
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.regions import EndpointResolver
1✔
20
from pytest_httpserver import HTTPServer
1✔
21
from werkzeug import Request, Response
1✔
22

23
from localstack import config
1✔
24
from localstack.aws.api.ec2 import CreateSecurityGroupRequest
1✔
25
from localstack.aws.connect import ServiceLevelClientFactory
1✔
26
from localstack.services.stores import (
1✔
27
    AccountRegionBundle,
28
    BaseStore,
29
    CrossAccountAttribute,
30
    CrossRegionAttribute,
31
    LocalAttribute,
32
)
33
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
34
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
35
from localstack.testing.config import (
1✔
36
    SECONDARY_TEST_AWS_ACCOUNT_ID,
37
    SECONDARY_TEST_AWS_REGION_NAME,
38
    TEST_AWS_ACCOUNT_ID,
39
    TEST_AWS_REGION_NAME,
40
)
41
from localstack.utils import testutil
1✔
42
from localstack.utils.aws.arns import get_partition
1✔
43
from localstack.utils.aws.client import SigningHttpClient
1✔
44
from localstack.utils.aws.resources import create_dynamodb_table
1✔
45
from localstack.utils.bootstrap import is_api_enabled
1✔
46
from localstack.utils.collections import ensure_list, select_from_typed_dict
1✔
47
from localstack.utils.functions import call_safe, run_safe
1✔
48
from localstack.utils.http import safe_requests as requests
1✔
49
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
50
from localstack.utils.json import CustomEncoder, json_safe
1✔
51
from localstack.utils.net import wait_for_port_open
1✔
52
from localstack.utils.strings import short_uid, to_str
1✔
53
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
54

55
LOG = logging.getLogger(__name__)
1✔
56

57
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
58
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
59

60
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
61
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
62
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
63
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
64

65

66
if TYPE_CHECKING:
1✔
67
    from mypy_boto3_sqs import SQSClient
×
68
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
69

70

71
@pytest.fixture(scope="session")
1✔
72
def aws_client_no_retry(aws_client_factory):
1✔
73
    """
74
    This fixture can be used to obtain Boto clients with disabled retries for testing.
75
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
76

77
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
78
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
79

80
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
81
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
82
    General socket/connection errors:
83
    * ConnectionError
84
    * ConnectionClosedError
85
    * ReadTimeoutError
86
    * EndpointConnectionError
87

88
    Service-side throttling/limit errors and exceptions:
89
    * Throttling
90
    * ThrottlingException
91
    * ThrottledException
92
    * RequestThrottledException
93
    * ProvisionedThroughputExceededException
94

95
    HTTP status codes: 429, 500, 502, 503, 504, and 509
96

97
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
98
    """
99
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
100
    return aws_client_factory(config=no_retry_config)
1✔
101

102

103
@pytest.fixture(scope="class")
1✔
104
def aws_http_client_factory(aws_session):
1✔
105
    """
106
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
107
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
108
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
109
    LocalStack.
110

111
    Example invocations
112

113
        client = aws_signing_http_client_factory("sqs")
114
        client.get("http://localhost:4566/000000000000/my-queue")
115

116
    or
117
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
118
        client.post("...")
119
    """
120

121
    def factory(
1✔
122
        service: str,
123
        region: str = None,
124
        signer_factory: Callable[
125
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
126
        ] = botocore.auth.SigV4QueryAuth,
127
        endpoint_url: str = None,
128
        aws_access_key_id: str = None,
129
        aws_secret_access_key: str = None,
130
    ):
131
        region = region or TEST_AWS_REGION_NAME
1✔
132

133
        if aws_access_key_id or aws_secret_access_key:
1✔
134
            credentials = botocore.credentials.Credentials(
1✔
135
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
136
            )
137
        else:
138
            credentials = aws_session.get_credentials()
1✔
139

140
        creds = credentials.get_frozen_credentials()
1✔
141

142
        if not endpoint_url:
1✔
143
            if is_aws_cloud():
1✔
144
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
145
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
146
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
147
            else:
148
                endpoint_url = config.internal_service_url()
1✔
149

150
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
151

152
    return factory
1✔
153

154

155
@pytest.fixture(scope="class")
1✔
156
def s3_vhost_client(aws_client_factory, region_name):
1✔
157
    return aws_client_factory(
1✔
158
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
159
    ).s3
160

161

162
@pytest.fixture
1✔
163
def dynamodb_wait_for_table_active(aws_client):
1✔
164
    def wait_for_table_active(table_name: str, client=None):
1✔
165
        def wait():
1✔
166
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
167
                "TableStatus"
168
            ] == "ACTIVE"
169

170
        poll_condition(wait, timeout=30)
1✔
171

172
    return wait_for_table_active
1✔
173

174

175
@pytest.fixture
1✔
176
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
177
    tables = []
1✔
178

179
    def factory(**kwargs):
1✔
180
        if "TableName" not in kwargs:
1✔
181
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
182

183
        tables.append(kwargs["TableName"])
1✔
184
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
185
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
186
        return response
1✔
187

188
    yield factory
1✔
189

190
    # cleanup
191
    for table in tables:
1✔
192
        try:
1✔
193
            # table has to be in ACTIVE state before deletion
194
            dynamodb_wait_for_table_active(table)
1✔
195
            aws_client.dynamodb.delete_table(TableName=table)
1✔
196
        except Exception as e:
1✔
197
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
198

199

200
@pytest.fixture
1✔
201
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
202
    # beware, this swallows exception in create_dynamodb_table utility function
203
    tables = []
1✔
204

205
    def factory(**kwargs):
1✔
206
        kwargs["client"] = aws_client.dynamodb
1✔
207
        if "table_name" not in kwargs:
1✔
208
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
209
        if "partition_key" not in kwargs:
1✔
210
            kwargs["partition_key"] = "id"
1✔
211

212
        tables.append(kwargs["table_name"])
1✔
213

214
        return create_dynamodb_table(**kwargs)
1✔
215

216
    yield factory
1✔
217

218
    # cleanup
219
    for table in tables:
1✔
220
        try:
1✔
221
            # table has to be in ACTIVE state before deletion
222
            dynamodb_wait_for_table_active(table)
1✔
223
            aws_client.dynamodb.delete_table(TableName=table)
1✔
224
        except Exception as e:
1✔
225
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
226

227

228
@pytest.fixture
1✔
229
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
230
    buckets = []
1✔
231

232
    def factory(**kwargs) -> str:
1✔
233
        if "Bucket" not in kwargs:
1✔
234
            kwargs["Bucket"] = "test-bucket-%s" % short_uid()
1✔
235

236
        if (
1✔
237
            "CreateBucketConfiguration" not in kwargs
238
            and aws_client.s3.meta.region_name != "us-east-1"
239
        ):
240
            kwargs["CreateBucketConfiguration"] = {
×
241
                "LocationConstraint": aws_client.s3.meta.region_name
242
            }
243

244
        aws_client.s3.create_bucket(**kwargs)
1✔
245
        buckets.append(kwargs["Bucket"])
1✔
246
        return kwargs["Bucket"]
1✔
247

248
    yield factory
1✔
249

250
    # cleanup
251
    for bucket in buckets:
1✔
252
        try:
1✔
253
            s3_empty_bucket(bucket)
1✔
254
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
255
        except Exception as e:
1✔
256
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
257

258

259
@pytest.fixture
1✔
260
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
261
    buckets = []
1✔
262

263
    def factory(s3_client, **kwargs) -> str:
1✔
264
        if "Bucket" not in kwargs:
1✔
265
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
266

267
        response = s3_client.create_bucket(**kwargs)
1✔
268
        buckets.append(kwargs["Bucket"])
1✔
269
        return response
1✔
270

271
    yield factory
1✔
272

273
    # cleanup
274
    for bucket in buckets:
1✔
275
        try:
1✔
276
            s3_empty_bucket(bucket)
1✔
277
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
278
        except Exception as e:
×
279
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
280

281

282
@pytest.fixture
1✔
283
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
284
    region = aws_client.s3.meta.region_name
1✔
285
    kwargs = {}
1✔
286
    if region != "us-east-1":
1✔
287
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
288
    return s3_create_bucket(**kwargs)
1✔
289

290

291
@pytest.fixture
1✔
292
def s3_empty_bucket(aws_client):
1✔
293
    """
294
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
295
    """
296

297
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
298
    # FIXME: this won't work when bucket has more than 1000 objects
299
    def factory(bucket_name: str):
1✔
300
        kwargs = {}
1✔
301
        try:
1✔
302
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
303
            kwargs["BypassGovernanceRetention"] = True
1✔
304
        except ClientError:
1✔
305
            pass
1✔
306

307
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
308
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
309
        if objects:
1✔
310
            aws_client.s3.delete_objects(
1✔
311
                Bucket=bucket_name,
312
                Delete={"Objects": objects},
313
                **kwargs,
314
            )
315

316
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
317
        versions = response.get("Versions", [])
1✔
318
        versions.extend(response.get("DeleteMarkers", []))
1✔
319

320
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
321
        if object_versions:
1✔
322
            aws_client.s3.delete_objects(
1✔
323
                Bucket=bucket_name,
324
                Delete={"Objects": object_versions},
325
                **kwargs,
326
            )
327

328
    yield factory
1✔
329

330

331
@pytest.fixture
1✔
332
def sqs_create_queue(aws_client):
1✔
333
    queue_urls = []
1✔
334

335
    def factory(**kwargs):
1✔
336
        if "QueueName" not in kwargs:
1✔
337
            kwargs["QueueName"] = "test-queue-%s" % short_uid()
1✔
338

339
        response = aws_client.sqs.create_queue(**kwargs)
1✔
340
        url = response["QueueUrl"]
1✔
341
        queue_urls.append(url)
1✔
342

343
        return url
1✔
344

345
    yield factory
1✔
346

347
    # cleanup
348
    for queue_url in queue_urls:
1✔
349
        try:
1✔
350
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
351
        except Exception as e:
1✔
352
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
353

354

355
@pytest.fixture
1✔
356
def sqs_receive_messages_delete(aws_client):
1✔
357
    def factory(
1✔
358
        queue_url: str,
359
        expected_messages: Optional[int] = None,
360
        wait_time: Optional[int] = 5,
361
    ):
362
        response = aws_client.sqs.receive_message(
1✔
363
            QueueUrl=queue_url,
364
            MessageAttributeNames=["All"],
365
            VisibilityTimeout=0,
366
            WaitTimeSeconds=wait_time,
367
        )
368
        messages = []
1✔
369
        for m in response["Messages"]:
1✔
370
            message = json.loads(to_str(m["Body"]))
1✔
371
            messages.append(message)
1✔
372

373
        if expected_messages is not None:
1✔
374
            assert len(messages) == expected_messages
×
375

376
        for message in response["Messages"]:
1✔
377
            aws_client.sqs.delete_message(
1✔
378
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
379
            )
380

381
        return messages
1✔
382

383
    return factory
1✔
384

385

386
@pytest.fixture
1✔
387
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
388
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
389
        all_messages = []
1✔
390
        for _ in range(max_iterations):
1✔
391
            try:
1✔
392
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
393
            except KeyError:
×
394
                # there were no messages
395
                continue
×
396
            all_messages.extend(messages)
1✔
397

398
            if len(all_messages) >= expected_messages:
1✔
399
                return all_messages[:expected_messages]
1✔
400

401
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
402

403
    return factory
1✔
404

405

406
@pytest.fixture
1✔
407
def sqs_collect_messages(aws_client):
1✔
408
    """Collects SQS messages from a given queue_url and deletes them by default.
409
    Example usage:
410
    messages = sqs_collect_messages(
411
         my_queue_url,
412
         expected=2,
413
         timeout=10,
414
         attribute_names=["All"],
415
         message_attribute_names=["All"],
416
    )
417
    """
418

419
    def factory(
1✔
420
        queue_url: str,
421
        expected: int,
422
        timeout: int,
423
        delete: bool = True,
424
        attribute_names: list[str] = None,
425
        message_attribute_names: list[str] = None,
426
        max_number_of_messages: int = 1,
427
        wait_time_seconds: int = 5,
428
        sqs_client: "SQSClient | None" = None,
429
    ) -> list["MessageTypeDef"]:
430
        sqs_client = sqs_client or aws_client.sqs
1✔
431
        collected = []
1✔
432

433
        def _receive():
1✔
434
            response = sqs_client.receive_message(
1✔
435
                QueueUrl=queue_url,
436
                # Maximum is 20 seconds. Performs long polling.
437
                WaitTimeSeconds=wait_time_seconds,
438
                # Maximum 10 messages
439
                MaxNumberOfMessages=max_number_of_messages,
440
                AttributeNames=attribute_names or [],
441
                MessageAttributeNames=message_attribute_names or [],
442
            )
443

444
            if messages := response.get("Messages"):
1✔
445
                collected.extend(messages)
1✔
446

447
                if delete:
1✔
448
                    for m in messages:
1✔
449
                        sqs_client.delete_message(
1✔
450
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
451
                        )
452

453
            return len(collected) >= expected
1✔
454

455
        if not poll_condition(_receive, timeout=timeout):
1✔
456
            raise TimeoutError(
×
457
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
458
            )
459

460
        return collected
1✔
461

462
    yield factory
1✔
463

464

465
@pytest.fixture
1✔
466
def sqs_queue(sqs_create_queue):
1✔
467
    return sqs_create_queue()
1✔
468

469

470
@pytest.fixture
1✔
471
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
472
    def _get_queue_arn(queue_url: str) -> str:
1✔
473
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
474
            "Attributes"
475
        ]["QueueArn"]
476

477
    return _get_queue_arn
1✔
478

479

480
@pytest.fixture
1✔
481
def sqs_queue_exists(aws_client):
1✔
482
    def _queue_exists(queue_url: str) -> bool:
1✔
483
        """
484
        Checks whether a queue with the given queue URL exists.
485
        :param queue_url: the queue URL
486
        :return: true if the queue exists, false otherwise
487
        """
488
        try:
1✔
489
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
490
            return result.get("QueueUrl") == queue_url
×
491
        except ClientError as e:
1✔
492
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
493
                return False
1✔
494
            raise
×
495

496
    yield _queue_exists
1✔
497

498

499
@pytest.fixture
1✔
500
def sns_create_topic(aws_client):
1✔
501
    topic_arns = []
1✔
502

503
    def _create_topic(**kwargs):
1✔
504
        if "Name" not in kwargs:
1✔
505
            kwargs["Name"] = "test-topic-%s" % short_uid()
1✔
506
        response = aws_client.sns.create_topic(**kwargs)
1✔
507
        topic_arns.append(response["TopicArn"])
1✔
508
        return response
1✔
509

510
    yield _create_topic
1✔
511

512
    for topic_arn in topic_arns:
1✔
513
        try:
1✔
514
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
515
        except Exception as e:
×
516
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
517

518

519
@pytest.fixture
1✔
520
def sns_wait_for_topic_delete(aws_client):
1✔
521
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
522
        def wait():
1✔
523
            try:
1✔
524
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
525
                return False
×
526
            except Exception as e:
1✔
527
                if "NotFound" in e.response["Error"]["Code"]:
1✔
528
                    return True
1✔
529

530
                raise
×
531

532
        poll_condition(wait, timeout=30)
1✔
533

534
    return wait_for_topic_delete
1✔
535

536

537
@pytest.fixture
1✔
538
def sns_subscription(aws_client):
1✔
539
    sub_arns = []
1✔
540

541
    def _create_sub(**kwargs):
1✔
542
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
543
            kwargs["ReturnSubscriptionArn"] = True
1✔
544

545
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
546
        response = aws_client.sns.subscribe(**kwargs)
1✔
547
        sub_arn = response["SubscriptionArn"]
1✔
548
        sub_arns.append(sub_arn)
1✔
549
        return response
1✔
550

551
    yield _create_sub
1✔
552

553
    for sub_arn in sub_arns:
1✔
554
        try:
1✔
555
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
556
        except Exception as e:
1✔
557
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
558

559

560
@pytest.fixture
1✔
561
def sns_topic(sns_create_topic, aws_client):
1✔
562
    topic_arn = sns_create_topic()["TopicArn"]
1✔
563
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
564

565

566
@pytest.fixture
1✔
567
def sns_allow_topic_sqs_queue(aws_client):
1✔
568
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
569
        # allow topic to write to sqs queue
570
        aws_client.sqs.set_queue_attributes(
1✔
571
            QueueUrl=sqs_queue_url,
572
            Attributes={
573
                "Policy": json.dumps(
574
                    {
575
                        "Statement": [
576
                            {
577
                                "Effect": "Allow",
578
                                "Principal": {"Service": "sns.amazonaws.com"},
579
                                "Action": "sqs:SendMessage",
580
                                "Resource": sqs_queue_arn,
581
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
582
                            }
583
                        ]
584
                    }
585
                )
586
            },
587
        )
588

589
    return _allow_sns_topic
1✔
590

591

592
@pytest.fixture
1✔
593
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
594
    subscriptions = []
1✔
595

596
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> Dict[str, str]:
1✔
597
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
598

599
        # connect sns topic to sqs
600
        subscription = aws_client.sns.subscribe(
1✔
601
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
602
        )
603
        subscription_arn = subscription["SubscriptionArn"]
1✔
604

605
        # allow topic to write to sqs queue
606
        sns_allow_topic_sqs_queue(
1✔
607
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
608
        )
609

610
        subscriptions.append(subscription_arn)
1✔
611
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
612
            "Attributes"
613
        ]
614

615
    yield _factory
1✔
616

617
    for arn in subscriptions:
1✔
618
        try:
1✔
619
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
620
        except Exception as e:
×
621
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
622

623

624
@pytest.fixture
1✔
625
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
626
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
627
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
628
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
629
    http_servers = []
1✔
630

631
    def _create_http_endpoint(
1✔
632
        raw_message_delivery: bool = False,
633
    ) -> Tuple[str, str, str, HTTPServer]:
634
        server = HTTPServer()
1✔
635
        server.start()
1✔
636
        http_servers.append(server)
1✔
637
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
638
        endpoint_url = server.url_for("/sns-endpoint")
1✔
639
        wait_for_port_open(endpoint_url)
1✔
640

641
        topic_arn = sns_create_topic()["TopicArn"]
1✔
642
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
643
        subscription_arn = subscription["SubscriptionArn"]
1✔
644
        delivery_policy = {
1✔
645
            "healthyRetryPolicy": {
646
                "minDelayTarget": 1,
647
                "maxDelayTarget": 1,
648
                "numRetries": 0,
649
                "numNoDelayRetries": 0,
650
                "numMinDelayRetries": 0,
651
                "numMaxDelayRetries": 0,
652
                "backoffFunction": "linear",
653
            },
654
            "sicklyRetryPolicy": None,
655
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
656
            "guaranteed": False,
657
        }
658
        aws_client.sns.set_subscription_attributes(
1✔
659
            SubscriptionArn=subscription_arn,
660
            AttributeName="DeliveryPolicy",
661
            AttributeValue=json.dumps(delivery_policy),
662
        )
663

664
        if raw_message_delivery:
1✔
665
            aws_client.sns.set_subscription_attributes(
1✔
666
                SubscriptionArn=subscription_arn,
667
                AttributeName="RawMessageDelivery",
668
                AttributeValue="true",
669
            )
670

671
        return topic_arn, subscription_arn, endpoint_url, server
1✔
672

673
    yield _create_http_endpoint
1✔
674

675
    for http_server in http_servers:
1✔
676
        if http_server.is_running():
1✔
677
            http_server.stop()
1✔
678

679

680
@pytest.fixture
1✔
681
def route53_hosted_zone(aws_client):
1✔
682
    hosted_zones = []
1✔
683

684
    def factory(**kwargs):
1✔
685
        if "Name" not in kwargs:
1✔
686
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
687
        if "CallerReference" not in kwargs:
1✔
688
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
689
        response = aws_client.route53.create_hosted_zone(
1✔
690
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
691
        )
692
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
693
        return response
1✔
694

695
    yield factory
1✔
696

697
    for zone in hosted_zones:
1✔
698
        try:
1✔
699
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
700
        except Exception as e:
1✔
701
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
702

703

704
@pytest.fixture
1✔
705
def transcribe_create_job(s3_bucket, aws_client):
1✔
706
    job_names = []
1✔
707

708
    def _create_job(audio_file: str, params: Optional[dict[str, Any]] = None) -> str:
1✔
709
        s3_key = "test-clip.wav"
1✔
710

711
        if not params:
1✔
712
            params = {}
1✔
713

714
        if "TranscriptionJobName" not in params:
1✔
715
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
716

717
        if "LanguageCode" not in params:
1✔
718
            params["LanguageCode"] = "en-GB"
1✔
719

720
        if "Media" not in params:
1✔
721
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
722

723
        # upload test wav to a s3 bucket
724
        with open(audio_file, "rb") as f:
1✔
725
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
726

727
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
728

729
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
730
        job_names.append(job_name)
1✔
731

732
        return job_name
1✔
733

734
    yield _create_job
1✔
735

736
    for job_name in job_names:
1✔
737
        with contextlib.suppress(ClientError):
1✔
738
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
739

740

741
@pytest.fixture
1✔
742
def kinesis_create_stream(aws_client):
1✔
743
    stream_names = []
1✔
744

745
    def _create_stream(**kwargs):
1✔
746
        if "StreamName" not in kwargs:
1✔
747
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
748
        aws_client.kinesis.create_stream(**kwargs)
1✔
749
        stream_names.append(kwargs["StreamName"])
1✔
750
        return kwargs["StreamName"]
1✔
751

752
    yield _create_stream
1✔
753

754
    for stream_name in stream_names:
1✔
755
        try:
1✔
756
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
757
        except Exception as e:
×
758
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
×
759

760

761
@pytest.fixture
1✔
762
def wait_for_stream_ready(aws_client):
1✔
763
    def _wait_for_stream_ready(stream_name: str):
1✔
764
        def is_stream_ready():
1✔
765
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
766
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
767
                "ACTIVE",
768
                "UPDATING",
769
            ]
770

771
        return poll_condition(is_stream_ready)
1✔
772

773
    return _wait_for_stream_ready
1✔
774

775

776
@pytest.fixture
1✔
777
def wait_for_delivery_stream_ready(aws_client):
1✔
778
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
779
        def is_stream_ready():
1✔
780
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
781
                DeliveryStreamName=delivery_stream_name
782
            )
783
            return (
1✔
784
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
785
                == "ACTIVE"
786
            )
787

788
        poll_condition(is_stream_ready)
1✔
789

790
    return _wait_for_stream_ready
1✔
791

792

793
@pytest.fixture
1✔
794
def wait_for_dynamodb_stream_ready(aws_client):
1✔
795
    def _wait_for_stream_ready(stream_arn: str):
1✔
796
        def is_stream_ready():
1✔
797
            describe_stream_response = aws_client.dynamodbstreams.describe_stream(
1✔
798
                StreamArn=stream_arn
799
            )
800
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
801

802
        return poll_condition(is_stream_ready)
1✔
803

804
    return _wait_for_stream_ready
1✔
805

806

807
@pytest.fixture()
1✔
808
def kms_create_key(aws_client_factory):
1✔
809
    key_ids = []
1✔
810

811
    def _create_key(region_name: str = None, **kwargs):
1✔
812
        if "Description" not in kwargs:
1✔
813
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
814
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
815
            "KeyMetadata"
816
        ]
817
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
818
        return key_metadata
1✔
819

820
    yield _create_key
1✔
821

822
    for region_name, key_id in key_ids:
1✔
823
        try:
1✔
824
            # shortest amount of time you can schedule the deletion
825
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
826
                KeyId=key_id, PendingWindowInDays=7
827
            )
828
        except Exception as e:
1✔
829
            exception_message = str(e)
1✔
830
            # Some tests schedule their keys for deletion themselves.
831
            if (
1✔
832
                "KMSInvalidStateException" not in exception_message
833
                or "is pending deletion" not in exception_message
834
            ):
835
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
836

837

838
@pytest.fixture()
1✔
839
def kms_replicate_key(aws_client_factory):
1✔
840
    key_ids = []
1✔
841

842
    def _replicate_key(region_from=None, **kwargs):
1✔
843
        region_to = kwargs.get("ReplicaRegion")
1✔
844
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
845
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
846

847
    yield _replicate_key
1✔
848

849
    for region_to, key_id in key_ids:
1✔
850
        try:
1✔
851
            # shortest amount of time you can schedule the deletion
852
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
853
                KeyId=key_id, PendingWindowInDays=7
854
            )
855
        except Exception as e:
×
856
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
857

858

859
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
860
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
861
# to make sure that we clean up aliases before keys get cleaned up.
862
@pytest.fixture()
1✔
863
def kms_create_alias(kms_create_key, aws_client):
1✔
864
    aliases = []
1✔
865

866
    def _create_alias(**kwargs):
1✔
867
        if "AliasName" not in kwargs:
1✔
868
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
869
        if "TargetKeyId" not in kwargs:
1✔
870
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
871

872
        aws_client.kms.create_alias(**kwargs)
1✔
873
        aliases.append(kwargs["AliasName"])
1✔
874
        return kwargs["AliasName"]
1✔
875

876
    yield _create_alias
1✔
877

878
    for alias in aliases:
1✔
879
        try:
1✔
880
            aws_client.kms.delete_alias(AliasName=alias)
1✔
881
        except Exception as e:
1✔
882
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
883

884

885
@pytest.fixture()
1✔
886
def kms_create_grant(kms_create_key, aws_client):
1✔
887
    grants = []
1✔
888

889
    def _create_grant(**kwargs):
1✔
890
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
891
        # but some GranteePrincipal is required to create a grant.
892
        GRANTEE_PRINCIPAL_ARN = (
1✔
893
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
894
        )
895

896
        if "Operations" not in kwargs:
1✔
897
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
898
        if "GranteePrincipal" not in kwargs:
1✔
899
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
900
        if "KeyId" not in kwargs:
1✔
901
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
902

903
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
904
        grants.append((grant_id, kwargs["KeyId"]))
1✔
905
        return grant_id, kwargs["KeyId"]
1✔
906

907
    yield _create_grant
1✔
908

909
    for grant_id, key_id in grants:
1✔
910
        try:
1✔
911
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
912
        except Exception as e:
×
913
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
914

915

916
@pytest.fixture
1✔
917
def kms_key(kms_create_key):
1✔
918
    return kms_create_key()
1✔
919

920

921
@pytest.fixture
1✔
922
def kms_grant_and_key(kms_key, aws_client):
1✔
923
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
924

925
    return [
1✔
926
        aws_client.kms.create_grant(
927
            KeyId=kms_key["KeyId"],
928
            GranteePrincipal=user_arn,
929
            Operations=["Decrypt", "Encrypt"],
930
        ),
931
        kms_key,
932
    ]
933

934

935
@pytest.fixture
1✔
936
def opensearch_wait_for_cluster(aws_client):
1✔
937
    def _wait_for_cluster(domain_name: str):
1✔
938
        def finished_processing():
1✔
939
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
940
            return status["Processing"] is False and "Endpoint" in status
1✔
941

942
        assert poll_condition(
1✔
943
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
944
        ), f"could not start domain: {domain_name}"
945

946
    return _wait_for_cluster
1✔
947

948

949
@pytest.fixture
1✔
950
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
951
    domains = []
1✔
952

953
    def factory(**kwargs) -> str:
1✔
954
        if "DomainName" not in kwargs:
1✔
955
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
956

957
        aws_client.opensearch.create_domain(**kwargs)
1✔
958

959
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
960

961
        domains.append(kwargs["DomainName"])
1✔
962
        return kwargs["DomainName"]
1✔
963

964
    yield factory
1✔
965

966
    # cleanup
967
    for domain in domains:
1✔
968
        try:
1✔
969
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
970
        except Exception as e:
×
971
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
972

973

974
@pytest.fixture
1✔
975
def opensearch_domain(opensearch_create_domain) -> str:
1✔
976
    return opensearch_create_domain()
1✔
977

978

979
@pytest.fixture
1✔
980
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
981
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
982
    assert "Endpoint" in status
1✔
983
    return f"https://{status['Endpoint']}"
1✔
984

985

986
@pytest.fixture
1✔
987
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
988
    document = {
1✔
989
        "first_name": "Boba",
990
        "last_name": "Fett",
991
        "age": 41,
992
        "about": "I'm just a simple man, trying to make my way in the universe.",
993
        "interests": ["mandalorian armor", "tusken culture"],
994
    }
995
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
996
    response = requests.put(
1✔
997
        document_path,
998
        data=json.dumps(document),
999
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
1000
    )
1001
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1002
    return document_path
1✔
1003

1004

1005
# Cleanup fixtures
1006
@pytest.fixture
1✔
1007
def cleanup_stacks(aws_client):
1✔
1008
    def _cleanup_stacks(stacks: List[str]) -> None:
1✔
1009
        stacks = ensure_list(stacks)
1✔
1010
        for stack in stacks:
1✔
1011
            try:
1✔
1012
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1013
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1014
            except Exception:
×
1015
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1016

1017
    return _cleanup_stacks
1✔
1018

1019

1020
@pytest.fixture
1✔
1021
def cleanup_changesets(aws_client):
1✔
1022
    def _cleanup_changesets(changesets: List[str]) -> None:
1✔
1023
        changesets = ensure_list(changesets)
1✔
1024
        for cs in changesets:
1✔
1025
            try:
1✔
1026
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1027
            except Exception:
1✔
1028
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1029

1030
    return _cleanup_changesets
1✔
1031

1032

1033
# Helpers for Cfn
1034

1035

1036
# TODO: exports(!)
1037
@dataclasses.dataclass(frozen=True)
1✔
1038
class DeployResult:
1✔
1039
    change_set_id: str
1✔
1040
    stack_id: str
1✔
1041
    stack_name: str
1✔
1042
    change_set_name: str
1✔
1043
    outputs: Dict[str, str]
1✔
1044

1045
    destroy: Callable[[], None]
1✔
1046

1047

1048
class StackDeployError(Exception):
1✔
1049
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1050
        self.describe_result = describe_res
1✔
1051
        self.events = events
1✔
1052

1053
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1054
        if config.CFN_VERBOSE_ERRORS:
1✔
1055
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1056
        else:
1057
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1058

1059
        super().__init__(msg)
1✔
1060

1061
    def format_events(self, events: list[dict]) -> str:
1✔
1062
        formatted_events = []
1✔
1063

1064
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1065
        for event in chronological_events:
1✔
1066
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1067
                formatted_events.append(self.format_event(event))
1✔
1068

1069
        return "\n".join(formatted_events)
1✔
1070

1071
    @staticmethod
1✔
1072
    def format_event(event: dict) -> str:
1✔
1073
        if reason := event.get("ResourceStatusReason"):
1✔
1074
            reason = reason.replace("\n", "; ")
1✔
1075
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1076
        else:
1077
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
×
1078

1079

1080
@pytest.fixture
1✔
1081
def deploy_cfn_template(
1✔
1082
    aws_client: ServiceLevelClientFactory,
1083
):
1084
    state: list[tuple[str, Callable]] = []
1✔
1085

1086
    def _deploy(
1✔
1087
        *,
1088
        is_update: Optional[bool] = False,
1089
        stack_name: Optional[str] = None,
1090
        change_set_name: Optional[str] = None,
1091
        template: Optional[str] = None,
1092
        template_path: Optional[str | os.PathLike] = None,
1093
        template_mapping: Optional[Dict[str, Any]] = None,
1094
        parameters: Optional[Dict[str, str]] = None,
1095
        role_arn: Optional[str] = None,
1096
        max_wait: Optional[int] = None,
1097
        delay_between_polls: Optional[int] = 2,
1098
        custom_aws_client: Optional[ServiceLevelClientFactory] = None,
1099
    ) -> DeployResult:
1100
        if is_update:
1✔
1101
            assert stack_name
1✔
1102
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1103
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1104

1105
        if max_wait is None:
1✔
1106
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1107

1108
        if template_path is not None:
1✔
1109
            template = load_template_file(template_path)
1✔
1110
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1111

1112
        kwargs = dict(
1✔
1113
            StackName=stack_name,
1114
            ChangeSetName=change_set_name,
1115
            TemplateBody=template_rendered,
1116
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1117
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1118
            Parameters=[
1119
                {
1120
                    "ParameterKey": k,
1121
                    "ParameterValue": v,
1122
                }
1123
                for (k, v) in (parameters or {}).items()
1124
            ],
1125
        )
1126
        if role_arn is not None:
1✔
1127
            kwargs["RoleARN"] = role_arn
×
1128

1129
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1130

1131
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1132

1133
        change_set_id = response["Id"]
1✔
1134
        stack_id = response["StackId"]
1✔
1135

1136
        cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1137
            ChangeSetName=change_set_id
1138
        )
1139
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1140
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1141
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1142
        )
1143

1144
        try:
1✔
1145
            stack_waiter.wait(
1✔
1146
                StackName=stack_id,
1147
                WaiterConfig={
1148
                    "Delay": delay_between_polls,
1149
                    "MaxAttempts": max_wait / delay_between_polls,
1150
                },
1151
            )
1152
        except botocore.exceptions.WaiterError as e:
1✔
1153
            raise StackDeployError(
1✔
1154
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1155
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1156
                    "StackEvents"
1157
                ],
1158
            ) from e
1159

1160
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1161
            "Stacks"
1162
        ][0]
1163
        outputs = describe_stack_res.get("Outputs", [])
1✔
1164

1165
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1166

1167
        def _destroy_stack():
1✔
1168
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1169
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1170
                StackName=stack_id,
1171
                WaiterConfig={
1172
                    "Delay": delay_between_polls,
1173
                    "MaxAttempts": max_wait / delay_between_polls,
1174
                },
1175
            )
1176

1177
        state.append((stack_id, _destroy_stack))
1✔
1178

1179
        return DeployResult(
1✔
1180
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1181
        )
1182

1183
    yield _deploy
1✔
1184

1185
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1186
    for stack_id, teardown in state[::-1]:
1✔
1187
        try:
1✔
1188
            teardown()
1✔
1189
        except Exception as e:
1✔
1190
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1191

1192

1193
@pytest.fixture
1✔
1194
def is_change_set_created_and_available(aws_client):
1✔
1195
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1196
        def _inner():
1✔
1197
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1198
            return (
1✔
1199
                # TODO: CREATE_FAILED should also not lead to further retries
1200
                change_set.get("Status") == "CREATE_COMPLETE"
1201
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1202
            )
1203

1204
        return _inner
1✔
1205

1206
    return _is_change_set_created_and_available
1✔
1207

1208

1209
@pytest.fixture
1✔
1210
def is_change_set_failed_and_unavailable(aws_client):
1✔
1211
    def _is_change_set_created_and_available(change_set_id: str):
×
1212
        def _inner():
×
1213
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1214
            return (
×
1215
                # TODO: CREATE_FAILED should also not lead to further retries
1216
                change_set.get("Status") == "FAILED"
1217
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1218
            )
1219

1220
        return _inner
×
1221

1222
    return _is_change_set_created_and_available
×
1223

1224

1225
@pytest.fixture
1✔
1226
def is_stack_created(aws_client):
1✔
1227
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1228

1229

1230
@pytest.fixture
1✔
1231
def is_stack_updated(aws_client):
1✔
1232
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1233

1234

1235
@pytest.fixture
1✔
1236
def is_stack_deleted(aws_client):
1✔
1237
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1238

1239

1240
def _has_stack_status(cfn_client, statuses: List[str]):
1✔
1241
    def _has_status(stack_id: str):
1✔
1242
        def _inner():
1✔
1243
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1244
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1245
            return s.get("StackStatus") in statuses
1✔
1246

1247
        return _inner
1✔
1248

1249
    return _has_status
1✔
1250

1251

1252
@pytest.fixture
1✔
1253
def is_change_set_finished(aws_client):
1✔
1254
    def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):
1✔
1255
        def _inner():
1✔
1256
            kwargs = {"ChangeSetName": change_set_id}
1✔
1257
            if stack_name:
1✔
1258
                kwargs["StackName"] = stack_name
×
1259

1260
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
1✔
1261

1262
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
1✔
1263
                LOG.warning("Change set failed")
×
1264
                raise ShortCircuitWaitException()
×
1265

1266
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
1✔
1267

1268
        return _inner
1✔
1269

1270
    return _is_change_set_finished
1✔
1271

1272

1273
@pytest.fixture
1✔
1274
def wait_until_lambda_ready(aws_client):
1✔
1275
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1276
        client = client or aws_client.lambda_
1✔
1277

1278
        def _is_not_pending():
1✔
1279
            kwargs = {}
1✔
1280
            if qualifier:
1✔
1281
                kwargs["Qualifier"] = qualifier
×
1282
            try:
1✔
1283
                result = (
1✔
1284
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1285
                    != "Pending"
1286
                )
1287
                LOG.debug("lambda state result: result=%s", result)
1✔
1288
                return result
1✔
1289
            except Exception as e:
×
1290
                LOG.error(e)
×
1291
                raise
×
1292

1293
        wait_until(_is_not_pending)
1✔
1294

1295
    return _wait_until_ready
1✔
1296

1297

1298
role_assume_policy = """
1✔
1299
{
1300
  "Version": "2012-10-17",
1301
  "Statement": [
1302
    {
1303
      "Effect": "Allow",
1304
      "Principal": {
1305
        "Service": "lambda.amazonaws.com"
1306
      },
1307
      "Action": "sts:AssumeRole"
1308
    }
1309
  ]
1310
}
1311
""".strip()
1312

1313
role_policy = """
1✔
1314
{
1315
    "Version": "2012-10-17",
1316
    "Statement": [
1317
        {
1318
            "Effect": "Allow",
1319
            "Action": [
1320
                "logs:CreateLogGroup",
1321
                "logs:CreateLogStream",
1322
                "logs:PutLogEvents"
1323
            ],
1324
            "Resource": [
1325
                "*"
1326
            ]
1327
        }
1328
    ]
1329
}
1330
""".strip()
1331

1332

1333
@pytest.fixture
1✔
1334
def create_lambda_function_aws(aws_client):
1✔
1335
    lambda_arns = []
1✔
1336

1337
    def _create_lambda_function(**kwargs):
1✔
1338
        def _create_function():
1✔
1339
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1340
            lambda_arns.append(resp["FunctionArn"])
1✔
1341

1342
            def _is_not_pending():
1✔
1343
                try:
1✔
1344
                    result = (
1✔
1345
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1346
                            "Configuration"
1347
                        ]["State"]
1348
                        != "Pending"
1349
                    )
1350
                    return result
1✔
1351
                except Exception as e:
×
1352
                    LOG.error(e)
×
1353
                    raise
×
1354

1355
            wait_until(_is_not_pending)
1✔
1356
            return resp
1✔
1357

1358
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1359
        # localstack should normally not require the retries and will just continue here
1360
        return retry(_create_function, retries=3, sleep=4)
1✔
1361

1362
    yield _create_lambda_function
1✔
1363

1364
    for arn in lambda_arns:
1✔
1365
        try:
1✔
1366
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1367
        except Exception:
×
1368
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1369

1370

1371
@pytest.fixture
1✔
1372
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1373
    lambda_arns_and_clients = []
1✔
1374
    log_groups = []
1✔
1375
    lambda_client = aws_client.lambda_
1✔
1376
    logs_client = aws_client.logs
1✔
1377
    s3_client = aws_client.s3
1✔
1378

1379
    def _create_lambda_function(*args, **kwargs):
1✔
1380
        client = kwargs.get("client") or lambda_client
1✔
1381
        kwargs["client"] = client
1✔
1382
        kwargs["s3_client"] = s3_client
1✔
1383
        func_name = kwargs.get("func_name")
1✔
1384
        assert func_name
1✔
1385
        del kwargs["func_name"]
1✔
1386

1387
        if not kwargs.get("role"):
1✔
1388
            kwargs["role"] = lambda_su_role
1✔
1389

1390
        def _create_function():
1✔
1391
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1392
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1393
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1394
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1395
            log_groups.append(log_group_name)
1✔
1396
            return resp
1✔
1397

1398
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1399
        # localstack should normally not require the retries and will just continue here
1400
        return retry(_create_function, retries=3, sleep=4)
1✔
1401

1402
    yield _create_lambda_function
1✔
1403

1404
    for arn, client in lambda_arns_and_clients:
1✔
1405
        try:
1✔
1406
            client.delete_function(FunctionName=arn)
1✔
1407
        except Exception:
1✔
1408
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1409

1410
    for log_group_name in log_groups:
1✔
1411
        try:
1✔
1412
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1413
        except Exception:
1✔
1414
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1415

1416

1417
@pytest.fixture
1✔
1418
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1419
    from localstack.aws.api.lambda_ import Runtime
1✔
1420

1421
    lambda_client = aws_client.lambda_
1✔
1422
    handler_code = textwrap.dedent(
1✔
1423
        """
1424
    import json
1425
    import os
1426

1427

1428
    def make_response(body: dict, status_code: int = 200):
1429
        return {
1430
            "statusCode": status_code,
1431
            "headers": {"Content-Type": "application/json"},
1432
            "body": body,
1433
        }
1434

1435

1436
    def trim_headers(headers):
1437
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1438
            return headers
1439
        return {
1440
            key: value for key, value in headers.items()
1441
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1442
        }
1443

1444

1445
    def handler(event, context):
1446
        print(json.dumps(event))
1447
        response = {
1448
            "args": event.get("queryStringParameters", {}),
1449
            "data": event.get("body", ""),
1450
            "domain": event["requestContext"].get("domainName", ""),
1451
            "headers": trim_headers(event.get("headers", {})),
1452
            "method": event["requestContext"]["http"].get("method", ""),
1453
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1454
            "path": event["requestContext"]["http"].get("path", ""),
1455
        }
1456
        return make_response(response)"""
1457
    )
1458

1459
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1460
        """Creates a server that will echo any request. Any request will be returned with the
1461
        following format. Any unset values will have those defaults.
1462
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1463
        order to create easier Snapshot testing. Default: `False`
1464
        {
1465
          "args": {},
1466
          "headers": {},
1467
          "data": "",
1468
          "method": "",
1469
          "domain": "",
1470
          "origin": "",
1471
          "path": ""
1472
        }"""
1473
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1474
        func_name = f"echo-http-{short_uid()}"
1✔
1475
        create_lambda_function(
1✔
1476
            func_name=func_name,
1477
            zip_file=zip_file,
1478
            runtime=Runtime.python3_12,
1479
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1480
        )
1481
        url_response = lambda_client.create_function_url_config(
1✔
1482
            FunctionName=func_name, AuthType="NONE"
1483
        )
1484
        aws_client.lambda_.add_permission(
1✔
1485
            FunctionName=func_name,
1486
            StatementId="urlPermission",
1487
            Action="lambda:InvokeFunctionUrl",
1488
            Principal="*",
1489
            FunctionUrlAuthType="NONE",
1490
        )
1491
        return url_response["FunctionUrl"]
1✔
1492

1493
    yield _create_echo_http_server
1✔
1494

1495

1496
@pytest.fixture
1✔
1497
def create_event_source_mapping(aws_client):
1✔
1498
    uuids = []
1✔
1499

1500
    def _create_event_source_mapping(*args, **kwargs):
1✔
1501
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1502
        uuids.append(response["UUID"])
1✔
1503
        return response
1✔
1504

1505
    yield _create_event_source_mapping
1✔
1506

1507
    for uuid in uuids:
1✔
1508
        try:
1✔
1509
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1510
        except Exception:
×
1511
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1512

1513

1514
@pytest.fixture
1✔
1515
def check_lambda_logs(aws_client):
1✔
1516
    def _check_logs(func_name: str, expected_lines: List[str] = None) -> List[str]:
1✔
1517
        if not expected_lines:
1✔
1518
            expected_lines = []
×
1519
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1520
        log_messages = [e["message"] for e in log_events]
1✔
1521
        for line in expected_lines:
1✔
1522
            if ".*" in line:
1✔
1523
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1524
                if any(found):
1✔
1525
                    continue
1✔
1526
            assert line in log_messages
×
1527
        return log_messages
1✔
1528

1529
    return _check_logs
1✔
1530

1531

1532
@pytest.fixture
1✔
1533
def create_policy(aws_client):
1✔
1534
    policy_arns = []
1✔
1535

1536
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1537
        iam_client = iam_client or aws_client.iam
1✔
1538
        if "PolicyName" not in kwargs:
1✔
1539
            kwargs["PolicyName"] = f"policy-{short_uid()}"
1✔
1540
        response = iam_client.create_policy(*args, **kwargs)
1✔
1541
        policy_arn = response["Policy"]["Arn"]
1✔
1542
        policy_arns.append((policy_arn, iam_client))
1✔
1543
        return response
1✔
1544

1545
    yield _create_policy
1✔
1546

1547
    for policy_arn, iam_client in policy_arns:
1✔
1548
        try:
1✔
1549
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1550
        except Exception:
1✔
1551
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1552

1553

1554
@pytest.fixture
1✔
1555
def create_user(aws_client):
1✔
1556
    usernames = []
1✔
1557

1558
    def _create_user(**kwargs):
1✔
1559
        if "UserName" not in kwargs:
1✔
1560
            kwargs["UserName"] = f"user-{short_uid()}"
×
1561
        response = aws_client.iam.create_user(**kwargs)
1✔
1562
        usernames.append(response["User"]["UserName"])
1✔
1563
        return response
1✔
1564

1565
    yield _create_user
1✔
1566

1567
    for username in usernames:
1✔
1568
        try:
1✔
1569
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1570
        except ClientError as e:
1✔
1571
            LOG.debug(
1✔
1572
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1573
            )
1574
            continue
1✔
1575

1576
        for inline_policy in inline_policies:
1✔
1577
            try:
1✔
1578
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1579
            except Exception:
×
1580
                LOG.debug(
×
1581
                    "Could not delete user policy '%s' from '%s' during cleanup",
1582
                    inline_policy,
1583
                    username,
1584
                )
1585
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1586
            "AttachedPolicies"
1587
        ]
1588
        for attached_policy in attached_policies:
1✔
1589
            try:
1✔
1590
                aws_client.iam.detach_user_policy(
1✔
1591
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1592
                )
1593
            except Exception:
1✔
1594
                LOG.debug(
1✔
1595
                    "Error detaching policy '%s' from user '%s'",
1596
                    attached_policy["PolicyArn"],
1597
                    username,
1598
                )
1599
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1600
        for access_key in access_keys:
1✔
1601
            try:
1✔
1602
                aws_client.iam.delete_access_key(
1✔
1603
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1604
                )
1605
            except Exception:
×
1606
                LOG.debug(
×
1607
                    "Error deleting access key '%s' from user '%s'",
1608
                    access_key["AccessKeyId"],
1609
                    username,
1610
                )
1611

1612
        try:
1✔
1613
            aws_client.iam.delete_user(UserName=username)
1✔
1614
        except Exception as e:
1✔
1615
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1616

1617

1618
@pytest.fixture
1✔
1619
def wait_and_assume_role(aws_client):
1✔
1620
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1621
        if not session_name:
1✔
1622
            session_name = f"session-{short_uid()}"
1✔
1623

1624
        def assume_role():
1✔
1625
            return aws_client.sts.assume_role(
1✔
1626
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1627
            )["Credentials"]
1628

1629
        # need to retry a couple of times before we are allowed to assume this role in AWS
1630
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1631
        return keys
1✔
1632

1633
    return _wait_and_assume_role
1✔
1634

1635

1636
@pytest.fixture
1✔
1637
def create_role(aws_client):
1✔
1638
    role_names = []
1✔
1639

1640
    def _create_role(iam_client=None, **kwargs):
1✔
1641
        if not kwargs.get("RoleName"):
1✔
1642
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1643
        iam_client = iam_client or aws_client.iam
1✔
1644
        result = iam_client.create_role(**kwargs)
1✔
1645
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1646
        return result
1✔
1647

1648
    yield _create_role
1✔
1649

1650
    for role_name, iam_client in role_names:
1✔
1651
        # detach policies
1652
        try:
1✔
1653
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1654
                "AttachedPolicies"
1655
            ]
1656
        except ClientError as e:
1✔
1657
            LOG.debug(
1✔
1658
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1659
                e,
1660
                role_name,
1661
            )
1662
            continue
1✔
1663
        for attached_policy in attached_policies:
1✔
1664
            try:
1✔
1665
                iam_client.detach_role_policy(
1✔
1666
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1667
                )
1668
            except Exception:
×
1669
                LOG.debug(
×
1670
                    "Could not detach role policy '%s' from '%s' during cleanup",
1671
                    attached_policy["PolicyArn"],
1672
                    role_name,
1673
                )
1674
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1675
        for role_policy in role_policies:
1✔
1676
            try:
1✔
1677
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1678
            except Exception:
×
1679
                LOG.debug(
×
1680
                    "Could not delete role policy '%s' from '%s' during cleanup",
1681
                    role_policy,
1682
                    role_name,
1683
                )
1684
        try:
1✔
1685
            iam_client.delete_role(RoleName=role_name)
1✔
1686
        except Exception:
×
1687
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1688

1689

1690
@pytest.fixture
1✔
1691
def create_parameter(aws_client):
1✔
1692
    params = []
1✔
1693

1694
    def _create_parameter(**kwargs):
1✔
1695
        params.append(kwargs["Name"])
1✔
1696
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1697

1698
    yield _create_parameter
1✔
1699

1700
    for param in params:
1✔
1701
        aws_client.ssm.delete_parameter(Name=param)
1✔
1702

1703

1704
@pytest.fixture
1✔
1705
def create_secret(aws_client):
1✔
1706
    items = []
1✔
1707

1708
    def _create_parameter(**kwargs):
1✔
1709
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1710
        items.append(create_response["ARN"])
1✔
1711
        return create_response
1✔
1712

1713
    yield _create_parameter
1✔
1714

1715
    for item in items:
1✔
1716
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1717

1718

1719
# TODO Figure out how to make cert creation tests pass against AWS.
1720
#
1721
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1722
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1723
#
1724
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1725
# by adding some CNAME records as requested by ASW in response to a certificate request.
1726
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1727
#
1728
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1729
# created by openssl etc. Not sure if there are other issues after that.
1730
#
1731
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1732
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1733
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1734
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1735
# pre-created certificates we would have to use specific domain names instead of random ones.
1736
@pytest.fixture
1✔
1737
def acm_request_certificate(aws_client_factory):
1✔
1738
    certificate_arns = []
1✔
1739

1740
    def factory(**kwargs) -> str:
1✔
1741
        if "DomainName" not in kwargs:
1✔
1742
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1743

1744
        region_name = kwargs.pop("region_name", None)
1✔
1745
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1746

1747
        response = acm_client.request_certificate(**kwargs)
1✔
1748
        created_certificate_arn = response["CertificateArn"]
1✔
1749
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1750
        return response
1✔
1751

1752
    yield factory
1✔
1753

1754
    # cleanup
1755
    for certificate_arn, region_name in certificate_arns:
1✔
1756
        try:
1✔
1757
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1758
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1759
        except Exception as e:
×
1760
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1761

1762

1763
role_policy_su = {
1✔
1764
    "Version": "2012-10-17",
1765
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1766
}
1767

1768

1769
@pytest.fixture(scope="session")
1✔
1770
def lambda_su_role(aws_client):
1✔
1771
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1772
    role = aws_client.iam.create_role(
1✔
1773
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1774
    )["Role"]
1775
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1776
    policy_arn = aws_client.iam.create_policy(
1✔
1777
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1778
    )["Policy"]["Arn"]
1779
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1780

1781
    if is_aws_cloud():  # dirty but necessary
1✔
1782
        time.sleep(10)
×
1783

1784
    yield role["Arn"]
1✔
1785

1786
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1787
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1788
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1789

1790

1791
@pytest.fixture
1✔
1792
def create_iam_role_and_attach_policy(aws_client):
1✔
1793
    """
1794
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1795

1796
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1797
    """
1798
    roles = []
×
1799

1800
    def _inner(**kwargs: dict[str, any]) -> str:
×
1801
        """
1802
        :param dict RoleDefinition: role definition document
1803
        :param str PolicyArn: policy ARN
1804
        :param str RoleName: role name (autogenerated if omitted)
1805
        :return: role ARN
1806
        """
1807
        if "RoleName" not in kwargs:
×
1808
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1809

1810
        role = kwargs["RoleName"]
×
1811
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1812

1813
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1814
        role_arn = result["Role"]["Arn"]
×
1815

1816
        policy_arn = kwargs["PolicyArn"]
×
1817
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1818

1819
        roles.append(role)
×
1820
        return role_arn
×
1821

1822
    yield _inner
×
1823

1824
    for role in roles:
×
1825
        try:
×
1826
            aws_client.iam.delete_role(RoleName=role)
×
1827
        except Exception as exc:
×
1828
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1829

1830

1831
@pytest.fixture
1✔
1832
def create_iam_role_with_policy(aws_client):
1✔
1833
    """
1834
    Fixture that creates an IAM role with given role definition and policy definition.
1835
    """
1836
    roles = {}
1✔
1837

1838
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1839
        """
1840
        :param dict RoleDefinition: role definition document
1841
        :param dict PolicyDefinition: policy definition document
1842
        :param str PolicyName: policy name (autogenerated if omitted)
1843
        :param str RoleName: role name (autogenerated if omitted)
1844
        :return: role ARN
1845
        """
1846
        if "RoleName" not in kwargs:
1✔
1847
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1848
        role = kwargs["RoleName"]
1✔
1849
        if "PolicyName" not in kwargs:
1✔
1850
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1851
        policy = kwargs["PolicyName"]
1✔
1852
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1853

1854
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1855
        role_arn = result["Role"]["Arn"]
1✔
1856

1857
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1858
        aws_client.iam.put_role_policy(
1✔
1859
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1860
        )
1861
        roles[role] = policy
1✔
1862
        return role_arn
1✔
1863

1864
    yield _create_role_and_policy
1✔
1865

1866
    for role_name, policy_name in roles.items():
1✔
1867
        try:
1✔
1868
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1869
        except Exception as exc:
×
1870
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1871
        try:
1✔
1872
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1873
        except Exception as exc:
×
1874
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1875

1876

1877
@pytest.fixture
1✔
1878
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1879
    delivery_stream_names = []
1✔
1880

1881
    def _create_delivery_stream(**kwargs):
1✔
1882
        if "DeliveryStreamName" not in kwargs:
1✔
1883
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1884
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1885
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1886
        delivery_stream_names.append(delivery_stream_name)
1✔
1887
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1888
        return response
1✔
1889

1890
    yield _create_delivery_stream
1✔
1891

1892
    for delivery_stream_name in delivery_stream_names:
1✔
1893
        try:
1✔
1894
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1895
        except Exception:
×
1896
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1897

1898

1899
@pytest.fixture
1✔
1900
def ses_configuration_set(aws_client):
1✔
1901
    configuration_set_names = []
1✔
1902

1903
    def factory(name: str) -> None:
1✔
1904
        aws_client.ses.create_configuration_set(
1✔
1905
            ConfigurationSet={
1906
                "Name": name,
1907
            },
1908
        )
1909
        configuration_set_names.append(name)
1✔
1910

1911
    yield factory
1✔
1912

1913
    for configuration_set_name in configuration_set_names:
1✔
1914
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1915

1916

1917
@pytest.fixture
1✔
1918
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1919
    event_destinations = []
1✔
1920

1921
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1922
        aws_client.ses.create_configuration_set_event_destination(
1✔
1923
            ConfigurationSetName=config_set_name,
1924
            EventDestination={
1925
                "Name": event_destination_name,
1926
                "Enabled": True,
1927
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1928
                "SNSDestination": {
1929
                    "TopicARN": topic_arn,
1930
                },
1931
            },
1932
        )
1933
        event_destinations.append((config_set_name, event_destination_name))
1✔
1934

1935
    yield factory
1✔
1936

1937
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1938
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1939
            ConfigurationSetName=created_config_set_name,
1940
            EventDestinationName=created_event_destination_name,
1941
        )
1942

1943

1944
@pytest.fixture
1✔
1945
def ses_email_template(aws_client):
1✔
1946
    template_names = []
1✔
1947

1948
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1949
        aws_client.ses.create_template(
1✔
1950
            Template={
1951
                "TemplateName": name,
1952
                "SubjectPart": subject,
1953
                "TextPart": contents,
1954
            }
1955
        )
1956
        template_names.append(name)
1✔
1957

1958
    yield factory
1✔
1959

1960
    for template_name in template_names:
1✔
1961
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1962

1963

1964
@pytest.fixture
1✔
1965
def ses_verify_identity(aws_client):
1✔
1966
    identities = []
1✔
1967

1968
    def factory(email_address: str) -> None:
1✔
1969
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1970

1971
    yield factory
1✔
1972

1973
    for identity in identities:
1✔
1974
        aws_client.ses.delete_identity(Identity=identity)
×
1975

1976

1977
@pytest.fixture
1✔
1978
def setup_sender_email_address(ses_verify_identity):
1✔
1979
    """
1980
    If the test is running against AWS then assume the email address passed is already
1981
    verified, and passes the given email address through. Otherwise, it generates one random
1982
    email address and verify them.
1983
    """
1984

1985
    def inner(sender_email_address: Optional[str] = None) -> str:
1✔
1986
        if is_aws_cloud():
1✔
1987
            if sender_email_address is None:
×
1988
                raise ValueError(
×
1989
                    "sender_email_address must be specified to run this test against AWS"
1990
                )
1991
        else:
1992
            # overwrite the given parameters with localstack specific ones
1993
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
1994
            ses_verify_identity(sender_email_address)
1✔
1995

1996
        return sender_email_address
1✔
1997

1998
    return inner
1✔
1999

2000

2001
@pytest.fixture
1✔
2002
def ec2_create_security_group(aws_client):
1✔
2003
    ec2_sgs = []
1✔
2004

2005
    def factory(ports=None, ip_protocol: str = "tcp", **kwargs):
1✔
2006
        """
2007
        Create the target group and authorize the security group ingress.
2008
        :param ports: list of ports to be authorized for the ingress rule.
2009
        :param ip_protocol: the ip protocol for the permissions (tcp by default)
2010
        """
2011
        if "GroupName" not in kwargs:
1✔
2012
            # FIXME: This will fail against AWS since the sg prefix is not valid for GroupName
2013
            # > "Group names may not be in the format sg-*".
2014
            kwargs["GroupName"] = f"sg-{short_uid()}"
1✔
2015
        # Making sure the call to CreateSecurityGroup gets the right arguments
2016
        _args = select_from_typed_dict(CreateSecurityGroupRequest, kwargs)
1✔
2017
        security_group = aws_client.ec2.create_security_group(**_args)
1✔
2018
        security_group_id = security_group["GroupId"]
1✔
2019

2020
        # FIXME: If 'ports' is None or an empty list, authorize_security_group_ingress will fail due to missing IpPermissions.
2021
        # Must ensure ports are explicitly provided or skip authorization entirely if not required.
2022
        permissions = [
1✔
2023
            {
2024
                "FromPort": port,
2025
                "IpProtocol": ip_protocol,
2026
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2027
                "ToPort": port,
2028
            }
2029
            for port in ports or []
2030
        ]
2031
        aws_client.ec2.authorize_security_group_ingress(
1✔
2032
            GroupId=security_group_id,
2033
            IpPermissions=permissions,
2034
        )
2035

2036
        ec2_sgs.append(security_group_id)
1✔
2037
        return security_group
1✔
2038

2039
    yield factory
1✔
2040

2041
    for sg_group_id in ec2_sgs:
1✔
2042
        try:
1✔
2043
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2044
        except Exception as e:
×
2045
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2046

2047

2048
@pytest.fixture
1✔
2049
def cleanups():
1✔
2050
    cleanup_fns = []
1✔
2051

2052
    yield cleanup_fns
1✔
2053

2054
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2055
        try:
1✔
2056
            cleanup_callback()
1✔
2057
        except Exception as e:
1✔
2058
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2059

2060

2061
@pytest.fixture(scope="session")
1✔
2062
def account_id(aws_client):
1✔
2063
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2064
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2065
    else:
2066
        return TEST_AWS_ACCOUNT_ID
×
2067

2068

2069
@pytest.fixture(scope="session")
1✔
2070
def region_name(aws_client):
1✔
2071
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2072
        return aws_client.sts.meta.region_name
1✔
2073
    else:
2074
        return TEST_AWS_REGION_NAME
×
2075

2076

2077
@pytest.fixture(scope="session")
1✔
2078
def partition(region_name):
1✔
2079
    return get_partition(region_name)
1✔
2080

2081

2082
@pytest.fixture(scope="session")
1✔
2083
def secondary_account_id(secondary_aws_client):
1✔
2084
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2085
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2086
    else:
2087
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2088

2089

2090
@pytest.fixture(scope="session")
1✔
2091
def secondary_region_name():
1✔
2092
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2093

2094

2095
@pytest.hookimpl
1✔
2096
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2097
    only_localstack = pytest.mark.skipif(
1✔
2098
        is_aws_cloud(),
2099
        reason="test only applicable if run against localstack",
2100
    )
2101
    for item in items:
1✔
2102
        for mark in item.iter_markers():
1✔
2103
            if mark.name.endswith("only_localstack"):
1✔
2104
                item.add_marker(only_localstack)
1✔
2105
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2106
            # add a marker that indicates that this test is snapshot validated
2107
            # if it uses the snapshot fixture -> allows selecting only snapshot
2108
            # validated tests in order to capture new snapshots for a whole
2109
            # test file with "-m snapshot_validated"
2110
            item.add_marker("snapshot_validated")
1✔
2111

2112

2113
@pytest.fixture
1✔
2114
def sample_stores() -> AccountRegionBundle:
1✔
2115
    class SampleStore(BaseStore):
1✔
2116
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2117
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2118
        region_specific_attr = LocalAttribute(default=list)
1✔
2119

2120
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2121

2122

2123
@pytest.fixture
1✔
2124
def create_rest_apigw(aws_client_factory):
1✔
2125
    rest_apis = []
1✔
2126
    retry_boto_config = None
1✔
2127
    if is_aws_cloud():
1✔
2128
        retry_boto_config = botocore.config.Config(
×
2129
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2130
            retries={"max_attempts": 10, "mode": "adaptive"}
2131
        )
2132

2133
    def _create_apigateway_function(**kwargs):
1✔
2134
        client_region_name = kwargs.pop("region_name", None)
1✔
2135
        apigateway_client = aws_client_factory(
1✔
2136
            region_name=client_region_name, config=retry_boto_config
2137
        ).apigateway
2138
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2139

2140
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2141
        api_id = response.get("id")
1✔
2142
        rest_apis.append((api_id, client_region_name))
1✔
2143

2144
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2145

2146
    yield _create_apigateway_function
1✔
2147

2148
    for rest_api_id, _client_region_name in rest_apis:
1✔
2149
        apigateway_client = aws_client_factory(
1✔
2150
            region_name=_client_region_name,
2151
            config=retry_boto_config,
2152
        ).apigateway
2153
        # First, retrieve the usage plans associated with the REST API
2154
        usage_plan_ids = []
1✔
2155
        usage_plans = apigateway_client.get_usage_plans()
1✔
2156
        for item in usage_plans.get("items", []):
1✔
2157
            api_stages = item.get("apiStages", [])
1✔
2158
            usage_plan_ids.extend(
1✔
2159
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2160
            )
2161

2162
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2163
        with contextlib.suppress(Exception):
1✔
2164
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2165

2166
        # finally delete the usage plans and the API Keys linked to it
2167
        for usage_plan_id in usage_plan_ids:
1✔
2168
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2169
            for key in usage_plan_keys.get("items", []):
1✔
2170
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2171
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2172

2173

2174
@pytest.fixture
1✔
2175
def create_rest_apigw_openapi(aws_client_factory):
1✔
2176
    rest_apis = []
×
2177

2178
    def _create_apigateway_function(**kwargs):
×
2179
        region_name = kwargs.pop("region_name", None)
×
2180
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2181

2182
        response = apigateway_client.import_rest_api(**kwargs)
×
2183
        api_id = response.get("id")
×
2184
        rest_apis.append((api_id, region_name))
×
2185
        return api_id, response
×
2186

2187
    yield _create_apigateway_function
×
2188

2189
    for rest_api_id, region_name in rest_apis:
×
2190
        with contextlib.suppress(Exception):
×
2191
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2192
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2193

2194

2195
@pytest.fixture
1✔
2196
def appsync_create_api(aws_client):
1✔
2197
    graphql_apis = []
×
2198

2199
    def factory(**kwargs):
×
2200
        if "name" not in kwargs:
×
2201
            kwargs["name"] = f"graphql-api-testing-name-{short_uid()}"
×
2202
        if not kwargs.get("authenticationType"):
×
2203
            kwargs["authenticationType"] = "API_KEY"
×
2204

2205
        result = aws_client.appsync.create_graphql_api(**kwargs)["graphqlApi"]
×
2206
        graphql_apis.append(result["apiId"])
×
2207
        return result
×
2208

2209
    yield factory
×
2210

2211
    for api in graphql_apis:
×
2212
        try:
×
2213
            aws_client.appsync.delete_graphql_api(apiId=api)
×
2214
        except Exception as e:
×
2215
            LOG.debug("Error cleaning up AppSync API: %s, %s", api, e)
×
2216

2217

2218
@pytest.fixture
1✔
2219
def assert_host_customisation(monkeypatch):
1✔
2220
    localstack_host = "foo.bar"
1✔
2221
    monkeypatch.setattr(
1✔
2222
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2223
    )
2224

2225
    def asserter(
1✔
2226
        url: str,
2227
        *,
2228
        custom_host: Optional[str] = None,
2229
    ):
2230
        if custom_host is not None:
1✔
2231
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2232

2233
            assert localstack_host not in url
×
2234
        else:
2235
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2236

2237
    yield asserter
1✔
2238

2239

2240
@pytest.fixture
1✔
2241
def echo_http_server(httpserver: HTTPServer):
1✔
2242
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2243

2244
    def _echo(request: Request) -> Response:
1✔
2245
        request_json = None
1✔
2246
        if request.is_json:
1✔
2247
            with contextlib.suppress(ValueError):
1✔
2248
                request_json = json.loads(request.data)
1✔
2249
        result = {
1✔
2250
            "data": request.data or "{}",
2251
            "headers": dict(request.headers),
2252
            "url": request.url,
2253
            "method": request.method,
2254
            "json": request_json,
2255
        }
2256
        response_body = json.dumps(json_safe(result))
1✔
2257
        return Response(response_body, status=200)
1✔
2258

2259
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2260
    http_endpoint = httpserver.url_for("/")
1✔
2261

2262
    return http_endpoint
1✔
2263

2264

2265
@pytest.fixture
1✔
2266
def echo_http_server_post(echo_http_server):
1✔
2267
    """
2268
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2269
    """
2270
    if is_aws_cloud():
1✔
2271
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2272

2273
    return f"{echo_http_server}post"
1✔
2274

2275

2276
def create_policy_doc(effect: str, actions: List, resource=None) -> Dict:
1✔
2277
    actions = ensure_list(actions)
1✔
2278
    resource = resource or "*"
1✔
2279
    return {
1✔
2280
        "Version": "2012-10-17",
2281
        "Statement": [
2282
            {
2283
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2284
                "Sid": f"s{short_uid()}",
2285
                "Effect": effect,
2286
                "Action": actions,
2287
                "Resource": resource,
2288
            }
2289
        ],
2290
    }
2291

2292

2293
@pytest.fixture
1✔
2294
def create_policy_generated_document(create_policy):
1✔
2295
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2296
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2297
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2298
        response = create_policy(
1✔
2299
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2300
        )
2301
        policy_arn = response["Policy"]["Arn"]
1✔
2302
        return policy_arn
1✔
2303

2304
    return _create_policy_with_doc
1✔
2305

2306

2307
@pytest.fixture
1✔
2308
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2309
    def _create_role_with_policy(
1✔
2310
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2311
    ):
2312
        iam_client = iam_client or aws_client.iam
1✔
2313

2314
        role_name = f"role-{short_uid()}"
1✔
2315
        result = create_role(
1✔
2316
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2317
        )
2318
        role_arn = result["Role"]["Arn"]
1✔
2319
        policy_name = f"p-{short_uid()}"
1✔
2320

2321
        if attach:
1✔
2322
            # create role and attach role policy
2323
            policy_arn = create_policy_generated_document(
1✔
2324
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2325
            )
2326
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2327
        else:
2328
            # put role policy
2329
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2330
            policy_document = json.dumps(policy_document)
1✔
2331
            iam_client.put_role_policy(
1✔
2332
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2333
            )
2334

2335
        return role_name, role_arn
1✔
2336

2337
    return _create_role_with_policy
1✔
2338

2339

2340
@pytest.fixture
1✔
2341
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2342
    def _create_user_with_policy(effect, actions, resource=None):
×
2343
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2344
        username = f"user-{short_uid()}"
×
2345
        create_user(UserName=username)
×
2346
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2347
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2348
        return username, keys
×
2349

2350
    return _create_user_with_policy
×
2351

2352

2353
@pytest.fixture()
1✔
2354
def register_extension(s3_bucket, aws_client):
1✔
2355
    cfn_client = aws_client.cloudformation
×
2356
    extensions_arns = []
×
2357

2358
    def _register(extension_name, extension_type, artifact_path):
×
2359
        bucket = s3_bucket
×
2360
        key = f"artifact-{short_uid()}"
×
2361

2362
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2363

2364
        register_response = cfn_client.register_type(
×
2365
            Type=extension_type,
2366
            TypeName=extension_name,
2367
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2368
        )
2369

2370
        registration_token = register_response["RegistrationToken"]
×
2371
        cfn_client.get_waiter("type_registration_complete").wait(
×
2372
            RegistrationToken=registration_token
2373
        )
2374

2375
        describe_response = cfn_client.describe_type_registration(
×
2376
            RegistrationToken=registration_token
2377
        )
2378

2379
        extensions_arns.append(describe_response["TypeArn"])
×
2380
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2381

2382
        return describe_response
×
2383

2384
    yield _register
×
2385

2386
    for arn in extensions_arns:
×
2387
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2388
        for v in versions:
×
2389
            try:
×
2390
                cfn_client.deregister_type(Arn=v["Arn"])
×
2391
            except Exception:
×
2392
                continue
×
2393
        cfn_client.deregister_type(Arn=arn)
×
2394

2395

2396
@pytest.fixture
1✔
2397
def hosted_zone(aws_client):
1✔
2398
    zone_ids = []
1✔
2399

2400
    def factory(**kwargs):
1✔
2401
        if "CallerReference" not in kwargs:
1✔
2402
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2403
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2404
        zone_id = response["HostedZone"]["Id"]
1✔
2405
        zone_ids.append(zone_id)
1✔
2406
        return response
1✔
2407

2408
    yield factory
1✔
2409

2410
    for zone_id in zone_ids[::-1]:
1✔
2411
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2412

2413

2414
@pytest.fixture
1✔
2415
def openapi_validate(monkeypatch):
1✔
2416
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2417
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2418

2419

2420
@pytest.fixture
1✔
2421
def set_resource_custom_id():
1✔
2422
    set_ids = []
1✔
2423

2424
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2425
        localstack_id_manager.set_custom_id(
1✔
2426
            resource_identifier=resource_identifier, custom_id=custom_id
2427
        )
2428
        set_ids.append(resource_identifier)
1✔
2429

2430
    yield _set_custom_id
1✔
2431

2432
    for resource_identifier in set_ids:
1✔
2433
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2434

2435

2436
###############################
2437
# Events (EventBridge) fixtures
2438
###############################
2439

2440

2441
@pytest.fixture
1✔
2442
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2443
    event_bus_names = []
1✔
2444

2445
    def _create_event_bus(**kwargs):
1✔
2446
        if "Name" not in kwargs:
1✔
2447
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2448

2449
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2450
        event_bus_names.append(kwargs["Name"])
1✔
2451
        return response
1✔
2452

2453
    yield _create_event_bus
1✔
2454

2455
    for event_bus_name in event_bus_names:
1✔
2456
        try:
1✔
2457
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2458
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2459

2460
            # Delete all rules for the current event bus
2461
            for rule in rules:
1✔
2462
                try:
1✔
2463
                    response = aws_client.events.list_targets_by_rule(
1✔
2464
                        Rule=rule, EventBusName=event_bus_name
2465
                    )
2466
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2467

2468
                    # Remove all targets for the current rule
2469
                    if targets:
1✔
2470
                        for target in targets:
1✔
2471
                            aws_client.events.remove_targets(
1✔
2472
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2473
                            )
2474

2475
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2476
                except Exception as e:
×
2477
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2478

2479
            # Delete archives for event bus
2480
            event_source_arn = (
1✔
2481
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2482
            )
2483
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2484
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2485
            for archive in archives:
1✔
2486
                try:
1✔
2487
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2488
                except Exception as e:
×
2489
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2490

2491
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2492
        except Exception as e:
1✔
2493
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2494

2495

2496
@pytest.fixture
1✔
2497
def events_put_rule(aws_client):
1✔
2498
    rules = []
1✔
2499

2500
    def _put_rule(**kwargs):
1✔
2501
        if "Name" not in kwargs:
1✔
2502
            kwargs["Name"] = f"rule-{short_uid()}"
×
2503

2504
        response = aws_client.events.put_rule(**kwargs)
1✔
2505
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2506
        return response
1✔
2507

2508
    yield _put_rule
1✔
2509

2510
    for rule, event_bus_name in rules:
1✔
2511
        try:
1✔
2512
            response = aws_client.events.list_targets_by_rule(
1✔
2513
                Rule=rule, EventBusName=event_bus_name
2514
            )
2515
            targets = [target["Id"] for target in response["Targets"]]
1✔
2516

2517
            # Remove all targets for the current rule
2518
            if targets:
1✔
2519
                for target in targets:
1✔
2520
                    aws_client.events.remove_targets(
1✔
2521
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2522
                    )
2523

2524
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2525
        except Exception as e:
1✔
2526
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2527

2528

2529
@pytest.fixture
1✔
2530
def events_create_rule(aws_client):
1✔
2531
    rules = []
1✔
2532

2533
    def _create_rule(**kwargs):
1✔
2534
        rule_name = kwargs["Name"]
1✔
2535
        bus_name = kwargs.get("EventBusName", "")
1✔
2536
        pattern = kwargs.get("EventPattern", {})
1✔
2537
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2538
        rule_arn = aws_client.events.put_rule(
1✔
2539
            Name=rule_name,
2540
            EventBusName=bus_name,
2541
            EventPattern=json.dumps(pattern),
2542
            ScheduleExpression=schedule,
2543
        )["RuleArn"]
2544
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2545
        return rule_arn
1✔
2546

2547
    yield _create_rule
1✔
2548

2549
    for rule in rules:
1✔
2550
        targets = aws_client.events.list_targets_by_rule(
1✔
2551
            Rule=rule["name"], EventBusName=rule["bus"]
2552
        )["Targets"]
2553

2554
        targetIds = [target["Id"] for target in targets]
1✔
2555
        if len(targetIds) > 0:
1✔
2556
            aws_client.events.remove_targets(
1✔
2557
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2558
            )
2559

2560
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2561

2562

2563
@pytest.fixture
1✔
2564
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2565
    queue_urls = []
1✔
2566

2567
    def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]:
1✔
2568
        if not queue_name:
1✔
2569
            queue_name = f"tests-queue-{short_uid()}"
1✔
2570
        sqs_client = aws_client.sqs
1✔
2571
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2572
        queue_urls.append(queue_url)
1✔
2573
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2574
        policy = {
1✔
2575
            "Version": "2012-10-17",
2576
            "Id": f"sqs-eventbridge-{short_uid()}",
2577
            "Statement": [
2578
                {
2579
                    "Sid": f"SendMessage-{short_uid()}",
2580
                    "Effect": "Allow",
2581
                    "Principal": {"Service": "events.amazonaws.com"},
2582
                    "Action": "sqs:SendMessage",
2583
                    "Resource": queue_arn,
2584
                }
2585
            ],
2586
        }
2587
        sqs_client.set_queue_attributes(
1✔
2588
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2589
        )
2590
        return queue_url, queue_arn
1✔
2591

2592
    yield _sqs_as_events_target
1✔
2593

2594
    for queue_url in queue_urls:
1✔
2595
        try:
1✔
2596
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
2597
        except Exception as e:
×
2598
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2599

2600

2601
@pytest.fixture
1✔
2602
def clean_up(
1✔
2603
    aws_client,
2604
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2605
    def _clean_up(
1✔
2606
        bus_name=None,
2607
        rule_name=None,
2608
        target_ids=None,
2609
        queue_url=None,
2610
        log_group_name=None,
2611
    ):
2612
        events_client = aws_client.events
1✔
2613
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2614
        if target_ids:
1✔
2615
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2616
            call_safe(
1✔
2617
                events_client.remove_targets,
2618
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2619
            )
2620
        if rule_name:
1✔
2621
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2622
        if bus_name:
1✔
2623
            call_safe(events_client.delete_event_bus, kwargs=dict(Name=bus_name))
×
2624
        if queue_url:
1✔
2625
            sqs_client = aws_client.sqs
×
2626
            call_safe(sqs_client.delete_queue, kwargs=dict(QueueUrl=queue_url))
×
2627
        if log_group_name:
1✔
2628
            logs_client = aws_client.logs
×
2629

2630
            def _delete_log_group():
×
2631
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2632
                for log_stream in log_streams["logStreams"]:
×
2633
                    logs_client.delete_log_stream(
×
2634
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2635
                    )
2636
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2637

2638
            call_safe(_delete_log_group)
×
2639

2640
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc