• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / a0150a35-50a8-4c2f-b8f7-cda83b0b938f

03 Jun 2025 05:34PM UTC coverage: 86.768%. Remained the same
a0150a35-50a8-4c2f-b8f7-cda83b0b938f

push

circleci

web-flow
CloudFormation v2 Engine: Base Support for Fn::Base64 (#12700)

20 of 22 new or added lines in 3 files covered. (90.91%)

93 existing lines in 11 files now uncovered.

65077 of 75001 relevant lines covered (86.77%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.46
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
1✔
10

11
import botocore.auth
1✔
12
import botocore.config
1✔
13
import botocore.credentials
1✔
14
import botocore.session
1✔
15
import pytest
1✔
16
from _pytest.config import Config
1✔
17
from _pytest.nodes import Item
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.regions import EndpointResolver
1✔
20
from pytest_httpserver import HTTPServer
1✔
21
from werkzeug import Request, Response
1✔
22

23
from localstack import config
1✔
24
from localstack.aws.api.ec2 import CreateSecurityGroupRequest
1✔
25
from localstack.aws.connect import ServiceLevelClientFactory
1✔
26
from localstack.services.stores import (
1✔
27
    AccountRegionBundle,
28
    BaseStore,
29
    CrossAccountAttribute,
30
    CrossRegionAttribute,
31
    LocalAttribute,
32
)
33
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
34
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
35
from localstack.testing.config import (
1✔
36
    SECONDARY_TEST_AWS_ACCOUNT_ID,
37
    SECONDARY_TEST_AWS_REGION_NAME,
38
    TEST_AWS_ACCOUNT_ID,
39
    TEST_AWS_REGION_NAME,
40
)
41
from localstack.utils import testutil
1✔
42
from localstack.utils.aws.arns import get_partition
1✔
43
from localstack.utils.aws.client import SigningHttpClient
1✔
44
from localstack.utils.aws.resources import create_dynamodb_table
1✔
45
from localstack.utils.bootstrap import is_api_enabled
1✔
46
from localstack.utils.collections import ensure_list, select_from_typed_dict
1✔
47
from localstack.utils.functions import call_safe, run_safe
1✔
48
from localstack.utils.http import safe_requests as requests
1✔
49
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
50
from localstack.utils.json import CustomEncoder, json_safe
1✔
51
from localstack.utils.net import wait_for_port_open
1✔
52
from localstack.utils.strings import short_uid, to_str
1✔
53
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
54

55
LOG = logging.getLogger(__name__)
1✔
56

57
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
58
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
59

60
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
61
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
62
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
63
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
64

65

66
if TYPE_CHECKING:
1✔
67
    from mypy_boto3_sqs import SQSClient
×
68
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
69

70

71
@pytest.fixture(scope="session")
1✔
72
def aws_client_no_retry(aws_client_factory):
1✔
73
    """
74
    This fixture can be used to obtain Boto clients with disabled retries for testing.
75
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
76

77
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
78
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
79

80
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
81
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
82
    General socket/connection errors:
83
    * ConnectionError
84
    * ConnectionClosedError
85
    * ReadTimeoutError
86
    * EndpointConnectionError
87

88
    Service-side throttling/limit errors and exceptions:
89
    * Throttling
90
    * ThrottlingException
91
    * ThrottledException
92
    * RequestThrottledException
93
    * ProvisionedThroughputExceededException
94

95
    HTTP status codes: 429, 500, 502, 503, 504, and 509
96

97
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
98
    """
99
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
100
    return aws_client_factory(config=no_retry_config)
1✔
101

102

103
@pytest.fixture(scope="class")
1✔
104
def aws_http_client_factory(aws_session):
1✔
105
    """
106
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
107
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
108
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
109
    LocalStack.
110

111
    Example invocations
112

113
        client = aws_signing_http_client_factory("sqs")
114
        client.get("http://localhost:4566/000000000000/my-queue")
115

116
    or
117
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
118
        client.post("...")
119
    """
120

121
    def factory(
1✔
122
        service: str,
123
        region: str = None,
124
        signer_factory: Callable[
125
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
126
        ] = botocore.auth.SigV4QueryAuth,
127
        endpoint_url: str = None,
128
        aws_access_key_id: str = None,
129
        aws_secret_access_key: str = None,
130
    ):
131
        region = region or TEST_AWS_REGION_NAME
1✔
132

133
        if aws_access_key_id or aws_secret_access_key:
1✔
134
            credentials = botocore.credentials.Credentials(
1✔
135
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
136
            )
137
        else:
138
            credentials = aws_session.get_credentials()
1✔
139

140
        creds = credentials.get_frozen_credentials()
1✔
141

142
        if not endpoint_url:
1✔
143
            if is_aws_cloud():
1✔
144
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
145
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
146
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
147
            else:
148
                endpoint_url = config.internal_service_url()
1✔
149

150
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
151

152
    return factory
1✔
153

154

155
@pytest.fixture(scope="class")
1✔
156
def s3_vhost_client(aws_client_factory, region_name):
1✔
157
    return aws_client_factory(
1✔
158
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
159
    ).s3
160

161

162
@pytest.fixture
1✔
163
def dynamodb_wait_for_table_active(aws_client):
1✔
164
    def wait_for_table_active(table_name: str, client=None):
1✔
165
        def wait():
1✔
166
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
167
                "TableStatus"
168
            ] == "ACTIVE"
169

170
        poll_condition(wait, timeout=30)
1✔
171

172
    return wait_for_table_active
1✔
173

174

175
@pytest.fixture
1✔
176
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
177
    tables = []
1✔
178

179
    def factory(**kwargs):
1✔
180
        if "TableName" not in kwargs:
1✔
181
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
182

183
        tables.append(kwargs["TableName"])
1✔
184
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
185
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
186
        return response
1✔
187

188
    yield factory
1✔
189

190
    # cleanup
191
    for table in tables:
1✔
192
        try:
1✔
193
            # table has to be in ACTIVE state before deletion
194
            dynamodb_wait_for_table_active(table)
1✔
195
            aws_client.dynamodb.delete_table(TableName=table)
1✔
196
        except Exception as e:
1✔
197
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
198

199

200
@pytest.fixture
1✔
201
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
202
    # beware, this swallows exception in create_dynamodb_table utility function
203
    tables = []
1✔
204

205
    def factory(**kwargs):
1✔
206
        kwargs["client"] = aws_client.dynamodb
1✔
207
        if "table_name" not in kwargs:
1✔
208
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
209
        if "partition_key" not in kwargs:
1✔
210
            kwargs["partition_key"] = "id"
1✔
211

212
        tables.append(kwargs["table_name"])
1✔
213

214
        return create_dynamodb_table(**kwargs)
1✔
215

216
    yield factory
1✔
217

218
    # cleanup
219
    for table in tables:
1✔
220
        try:
1✔
221
            # table has to be in ACTIVE state before deletion
222
            dynamodb_wait_for_table_active(table)
1✔
223
            aws_client.dynamodb.delete_table(TableName=table)
1✔
224
        except Exception as e:
1✔
225
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
226

227

228
@pytest.fixture
1✔
229
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
230
    buckets = []
1✔
231

232
    def factory(**kwargs) -> str:
1✔
233
        if "Bucket" not in kwargs:
1✔
234
            kwargs["Bucket"] = "test-bucket-%s" % short_uid()
1✔
235

236
        if (
1✔
237
            "CreateBucketConfiguration" not in kwargs
238
            and aws_client.s3.meta.region_name != "us-east-1"
239
        ):
UNCOV
240
            kwargs["CreateBucketConfiguration"] = {
×
241
                "LocationConstraint": aws_client.s3.meta.region_name
242
            }
243

244
        aws_client.s3.create_bucket(**kwargs)
1✔
245
        buckets.append(kwargs["Bucket"])
1✔
246
        return kwargs["Bucket"]
1✔
247

248
    yield factory
1✔
249

250
    # cleanup
251
    for bucket in buckets:
1✔
252
        try:
1✔
253
            s3_empty_bucket(bucket)
1✔
254
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
255
        except Exception as e:
1✔
256
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
257

258

259
@pytest.fixture
1✔
260
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
261
    buckets = []
1✔
262

263
    def factory(s3_client, **kwargs) -> str:
1✔
264
        if "Bucket" not in kwargs:
1✔
265
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
266

267
        response = s3_client.create_bucket(**kwargs)
1✔
268
        buckets.append(kwargs["Bucket"])
1✔
269
        return response
1✔
270

271
    yield factory
1✔
272

273
    # cleanup
274
    for bucket in buckets:
1✔
275
        try:
1✔
276
            s3_empty_bucket(bucket)
1✔
277
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
278
        except Exception as e:
×
279
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
280

281

282
@pytest.fixture
1✔
283
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
284
    region = aws_client.s3.meta.region_name
1✔
285
    kwargs = {}
1✔
286
    if region != "us-east-1":
1✔
UNCOV
287
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
288
    return s3_create_bucket(**kwargs)
1✔
289

290

291
@pytest.fixture
1✔
292
def s3_empty_bucket(aws_client):
1✔
293
    """
294
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
295
    """
296

297
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
298
    # FIXME: this won't work when bucket has more than 1000 objects
299
    def factory(bucket_name: str):
1✔
300
        kwargs = {}
1✔
301
        try:
1✔
302
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
303
            kwargs["BypassGovernanceRetention"] = True
1✔
304
        except ClientError:
1✔
305
            pass
1✔
306

307
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
308
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
309
        if objects:
1✔
310
            aws_client.s3.delete_objects(
1✔
311
                Bucket=bucket_name,
312
                Delete={"Objects": objects},
313
                **kwargs,
314
            )
315

316
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
317
        versions = response.get("Versions", [])
1✔
318
        versions.extend(response.get("DeleteMarkers", []))
1✔
319

320
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
321
        if object_versions:
1✔
322
            aws_client.s3.delete_objects(
1✔
323
                Bucket=bucket_name,
324
                Delete={"Objects": object_versions},
325
                **kwargs,
326
            )
327

328
    yield factory
1✔
329

330

331
@pytest.fixture
1✔
332
def sqs_create_queue(aws_client):
1✔
333
    queue_urls = []
1✔
334

335
    def factory(**kwargs):
1✔
336
        if "QueueName" not in kwargs:
1✔
337
            kwargs["QueueName"] = "test-queue-%s" % short_uid()
1✔
338

339
        response = aws_client.sqs.create_queue(**kwargs)
1✔
340
        url = response["QueueUrl"]
1✔
341
        queue_urls.append(url)
1✔
342

343
        return url
1✔
344

345
    yield factory
1✔
346

347
    # cleanup
348
    for queue_url in queue_urls:
1✔
349
        try:
1✔
350
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
351
        except Exception as e:
1✔
352
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
353

354

355
@pytest.fixture
1✔
356
def sqs_receive_messages_delete(aws_client):
1✔
357
    def factory(
1✔
358
        queue_url: str,
359
        expected_messages: Optional[int] = None,
360
        wait_time: Optional[int] = 5,
361
    ):
362
        response = aws_client.sqs.receive_message(
1✔
363
            QueueUrl=queue_url,
364
            MessageAttributeNames=["All"],
365
            VisibilityTimeout=0,
366
            WaitTimeSeconds=wait_time,
367
        )
368
        messages = []
1✔
369
        for m in response["Messages"]:
1✔
370
            message = json.loads(to_str(m["Body"]))
1✔
371
            messages.append(message)
1✔
372

373
        if expected_messages is not None:
1✔
374
            assert len(messages) == expected_messages
×
375

376
        for message in response["Messages"]:
1✔
377
            aws_client.sqs.delete_message(
1✔
378
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
379
            )
380

381
        return messages
1✔
382

383
    return factory
1✔
384

385

386
@pytest.fixture
1✔
387
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
388
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
389
        all_messages = []
1✔
390
        for _ in range(max_iterations):
1✔
391
            try:
1✔
392
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
393
            except KeyError:
×
394
                # there were no messages
395
                continue
×
396
            all_messages.extend(messages)
1✔
397

398
            if len(all_messages) >= expected_messages:
1✔
399
                return all_messages[:expected_messages]
1✔
400

401
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
402

403
    return factory
1✔
404

405

406
@pytest.fixture
1✔
407
def sqs_collect_messages(aws_client):
1✔
408
    """Collects SQS messages from a given queue_url and deletes them by default.
409
    Example usage:
410
    messages = sqs_collect_messages(
411
         my_queue_url,
412
         expected=2,
413
         timeout=10,
414
         attribute_names=["All"],
415
         message_attribute_names=["All"],
416
    )
417
    """
418

419
    def factory(
1✔
420
        queue_url: str,
421
        expected: int,
422
        timeout: int,
423
        delete: bool = True,
424
        attribute_names: list[str] = None,
425
        message_attribute_names: list[str] = None,
426
        max_number_of_messages: int = 1,
427
        wait_time_seconds: int = 5,
428
        sqs_client: "SQSClient | None" = None,
429
    ) -> list["MessageTypeDef"]:
430
        sqs_client = sqs_client or aws_client.sqs
1✔
431
        collected = []
1✔
432

433
        def _receive():
1✔
434
            response = sqs_client.receive_message(
1✔
435
                QueueUrl=queue_url,
436
                # Maximum is 20 seconds. Performs long polling.
437
                WaitTimeSeconds=wait_time_seconds,
438
                # Maximum 10 messages
439
                MaxNumberOfMessages=max_number_of_messages,
440
                AttributeNames=attribute_names or [],
441
                MessageAttributeNames=message_attribute_names or [],
442
            )
443

444
            if messages := response.get("Messages"):
1✔
445
                collected.extend(messages)
1✔
446

447
                if delete:
1✔
448
                    for m in messages:
1✔
449
                        sqs_client.delete_message(
1✔
450
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
451
                        )
452

453
            return len(collected) >= expected
1✔
454

455
        if not poll_condition(_receive, timeout=timeout):
1✔
456
            raise TimeoutError(
×
457
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
458
            )
459

460
        return collected
1✔
461

462
    yield factory
1✔
463

464

465
@pytest.fixture
1✔
466
def sqs_queue(sqs_create_queue):
1✔
467
    return sqs_create_queue()
1✔
468

469

470
@pytest.fixture
1✔
471
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
472
    def _get_queue_arn(queue_url: str) -> str:
1✔
473
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
474
            "Attributes"
475
        ]["QueueArn"]
476

477
    return _get_queue_arn
1✔
478

479

480
@pytest.fixture
1✔
481
def sqs_queue_exists(aws_client):
1✔
482
    def _queue_exists(queue_url: str) -> bool:
1✔
483
        """
484
        Checks whether a queue with the given queue URL exists.
485
        :param queue_url: the queue URL
486
        :return: true if the queue exists, false otherwise
487
        """
488
        try:
1✔
489
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
490
            return result.get("QueueUrl") == queue_url
×
491
        except ClientError as e:
1✔
492
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
493
                return False
1✔
494
            raise
×
495

496
    yield _queue_exists
1✔
497

498

499
@pytest.fixture
1✔
500
def sns_create_topic(aws_client):
1✔
501
    topic_arns = []
1✔
502

503
    def _create_topic(**kwargs):
1✔
504
        if "Name" not in kwargs:
1✔
505
            kwargs["Name"] = "test-topic-%s" % short_uid()
1✔
506
        response = aws_client.sns.create_topic(**kwargs)
1✔
507
        topic_arns.append(response["TopicArn"])
1✔
508
        return response
1✔
509

510
    yield _create_topic
1✔
511

512
    for topic_arn in topic_arns:
1✔
513
        try:
1✔
514
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
515
        except Exception as e:
×
516
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
517

518

519
@pytest.fixture
1✔
520
def sns_wait_for_topic_delete(aws_client):
1✔
521
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
522
        def wait():
1✔
523
            try:
1✔
524
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
525
                return False
×
526
            except Exception as e:
1✔
527
                if "NotFound" in e.response["Error"]["Code"]:
1✔
528
                    return True
1✔
529

530
                raise
×
531

532
        poll_condition(wait, timeout=30)
1✔
533

534
    return wait_for_topic_delete
1✔
535

536

537
@pytest.fixture
1✔
538
def sns_subscription(aws_client):
1✔
539
    sub_arns = []
1✔
540

541
    def _create_sub(**kwargs):
1✔
542
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
543
            kwargs["ReturnSubscriptionArn"] = True
1✔
544

545
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
546
        response = aws_client.sns.subscribe(**kwargs)
1✔
547
        sub_arn = response["SubscriptionArn"]
1✔
548
        sub_arns.append(sub_arn)
1✔
549
        return response
1✔
550

551
    yield _create_sub
1✔
552

553
    for sub_arn in sub_arns:
1✔
554
        try:
1✔
555
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
556
        except Exception as e:
1✔
557
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
558

559

560
@pytest.fixture
1✔
561
def sns_topic(sns_create_topic, aws_client):
1✔
562
    topic_arn = sns_create_topic()["TopicArn"]
1✔
563
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
564

565

566
@pytest.fixture
1✔
567
def sns_allow_topic_sqs_queue(aws_client):
1✔
568
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
569
        # allow topic to write to sqs queue
570
        aws_client.sqs.set_queue_attributes(
1✔
571
            QueueUrl=sqs_queue_url,
572
            Attributes={
573
                "Policy": json.dumps(
574
                    {
575
                        "Statement": [
576
                            {
577
                                "Effect": "Allow",
578
                                "Principal": {"Service": "sns.amazonaws.com"},
579
                                "Action": "sqs:SendMessage",
580
                                "Resource": sqs_queue_arn,
581
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
582
                            }
583
                        ]
584
                    }
585
                )
586
            },
587
        )
588

589
    return _allow_sns_topic
1✔
590

591

592
@pytest.fixture
1✔
593
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
594
    subscriptions = []
1✔
595

596
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> Dict[str, str]:
1✔
597
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
598

599
        # connect sns topic to sqs
600
        subscription = aws_client.sns.subscribe(
1✔
601
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
602
        )
603
        subscription_arn = subscription["SubscriptionArn"]
1✔
604

605
        # allow topic to write to sqs queue
606
        sns_allow_topic_sqs_queue(
1✔
607
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
608
        )
609

610
        subscriptions.append(subscription_arn)
1✔
611
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
612
            "Attributes"
613
        ]
614

615
    yield _factory
1✔
616

617
    for arn in subscriptions:
1✔
618
        try:
1✔
619
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
620
        except Exception as e:
×
621
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
622

623

624
@pytest.fixture
1✔
625
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
626
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
627
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
628
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
629
    http_servers = []
1✔
630

631
    def _create_http_endpoint(
1✔
632
        raw_message_delivery: bool = False,
633
    ) -> Tuple[str, str, str, HTTPServer]:
634
        server = HTTPServer()
1✔
635
        server.start()
1✔
636
        http_servers.append(server)
1✔
637
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
638
        endpoint_url = server.url_for("/sns-endpoint")
1✔
639
        wait_for_port_open(endpoint_url)
1✔
640

641
        topic_arn = sns_create_topic()["TopicArn"]
1✔
642
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
643
        subscription_arn = subscription["SubscriptionArn"]
1✔
644
        delivery_policy = {
1✔
645
            "healthyRetryPolicy": {
646
                "minDelayTarget": 1,
647
                "maxDelayTarget": 1,
648
                "numRetries": 0,
649
                "numNoDelayRetries": 0,
650
                "numMinDelayRetries": 0,
651
                "numMaxDelayRetries": 0,
652
                "backoffFunction": "linear",
653
            },
654
            "sicklyRetryPolicy": None,
655
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
656
            "guaranteed": False,
657
        }
658
        aws_client.sns.set_subscription_attributes(
1✔
659
            SubscriptionArn=subscription_arn,
660
            AttributeName="DeliveryPolicy",
661
            AttributeValue=json.dumps(delivery_policy),
662
        )
663

664
        if raw_message_delivery:
1✔
665
            aws_client.sns.set_subscription_attributes(
1✔
666
                SubscriptionArn=subscription_arn,
667
                AttributeName="RawMessageDelivery",
668
                AttributeValue="true",
669
            )
670

671
        return topic_arn, subscription_arn, endpoint_url, server
1✔
672

673
    yield _create_http_endpoint
1✔
674

675
    for http_server in http_servers:
1✔
676
        if http_server.is_running():
1✔
677
            http_server.stop()
1✔
678

679

680
@pytest.fixture
1✔
681
def route53_hosted_zone(aws_client):
1✔
682
    hosted_zones = []
1✔
683

684
    def factory(**kwargs):
1✔
685
        if "Name" not in kwargs:
1✔
686
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
687
        if "CallerReference" not in kwargs:
1✔
688
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
689
        response = aws_client.route53.create_hosted_zone(
1✔
690
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
691
        )
692
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
693
        return response
1✔
694

695
    yield factory
1✔
696

697
    for zone in hosted_zones:
1✔
698
        try:
1✔
699
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
700
        except Exception as e:
1✔
701
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
702

703

704
@pytest.fixture
1✔
705
def transcribe_create_job(s3_bucket, aws_client):
1✔
706
    job_names = []
1✔
707

708
    def _create_job(audio_file: str, params: Optional[dict[str, Any]] = None) -> str:
1✔
709
        s3_key = "test-clip.wav"
1✔
710

711
        if not params:
1✔
712
            params = {}
1✔
713

714
        if "TranscriptionJobName" not in params:
1✔
715
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
716

717
        if "LanguageCode" not in params:
1✔
718
            params["LanguageCode"] = "en-GB"
1✔
719

720
        if "Media" not in params:
1✔
721
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
722

723
        # upload test wav to a s3 bucket
724
        with open(audio_file, "rb") as f:
1✔
725
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
726

727
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
728

729
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
730
        job_names.append(job_name)
1✔
731

732
        return job_name
1✔
733

734
    yield _create_job
1✔
735

736
    for job_name in job_names:
1✔
737
        with contextlib.suppress(ClientError):
1✔
738
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
739

740

741
@pytest.fixture
1✔
742
def kinesis_create_stream(aws_client):
1✔
743
    stream_names = []
1✔
744

745
    def _create_stream(**kwargs):
1✔
746
        if "StreamName" not in kwargs:
1✔
747
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
748
        aws_client.kinesis.create_stream(**kwargs)
1✔
749
        stream_names.append(kwargs["StreamName"])
1✔
750
        return kwargs["StreamName"]
1✔
751

752
    yield _create_stream
1✔
753

754
    for stream_name in stream_names:
1✔
755
        try:
1✔
756
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
757
        except Exception as e:
×
758
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
×
759

760

761
@pytest.fixture
1✔
762
def wait_for_stream_ready(aws_client):
1✔
763
    def _wait_for_stream_ready(stream_name: str):
1✔
764
        def is_stream_ready():
1✔
765
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
766
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
767
                "ACTIVE",
768
                "UPDATING",
769
            ]
770

771
        return poll_condition(is_stream_ready)
1✔
772

773
    return _wait_for_stream_ready
1✔
774

775

776
@pytest.fixture
1✔
777
def wait_for_delivery_stream_ready(aws_client):
1✔
778
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
779
        def is_stream_ready():
1✔
780
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
781
                DeliveryStreamName=delivery_stream_name
782
            )
783
            return (
1✔
784
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
785
                == "ACTIVE"
786
            )
787

788
        poll_condition(is_stream_ready)
1✔
789

790
    return _wait_for_stream_ready
1✔
791

792

793
@pytest.fixture
1✔
794
def wait_for_dynamodb_stream_ready(aws_client):
1✔
795
    def _wait_for_stream_ready(stream_arn: str, client=None):
1✔
796
        def is_stream_ready():
1✔
797
            ddb_client = client or aws_client.dynamodbstreams
1✔
798
            describe_stream_response = ddb_client.describe_stream(StreamArn=stream_arn)
1✔
799
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
800

801
        return poll_condition(is_stream_ready)
1✔
802

803
    return _wait_for_stream_ready
1✔
804

805

806
@pytest.fixture()
1✔
807
def kms_create_key(aws_client_factory):
1✔
808
    key_ids = []
1✔
809

810
    def _create_key(region_name: str = None, **kwargs):
1✔
811
        if "Description" not in kwargs:
1✔
812
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
813
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
814
            "KeyMetadata"
815
        ]
816
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
817
        return key_metadata
1✔
818

819
    yield _create_key
1✔
820

821
    for region_name, key_id in key_ids:
1✔
822
        try:
1✔
823
            # shortest amount of time you can schedule the deletion
824
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
825
                KeyId=key_id, PendingWindowInDays=7
826
            )
827
        except Exception as e:
1✔
828
            exception_message = str(e)
1✔
829
            # Some tests schedule their keys for deletion themselves.
830
            if (
1✔
831
                "KMSInvalidStateException" not in exception_message
832
                or "is pending deletion" not in exception_message
833
            ):
834
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
835

836

837
@pytest.fixture()
1✔
838
def kms_replicate_key(aws_client_factory):
1✔
839
    key_ids = []
1✔
840

841
    def _replicate_key(region_from=None, **kwargs):
1✔
842
        region_to = kwargs.get("ReplicaRegion")
1✔
843
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
844
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
845

846
    yield _replicate_key
1✔
847

848
    for region_to, key_id in key_ids:
1✔
849
        try:
1✔
850
            # shortest amount of time you can schedule the deletion
851
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
852
                KeyId=key_id, PendingWindowInDays=7
853
            )
854
        except Exception as e:
×
855
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
856

857

858
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
859
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
860
# to make sure that we clean up aliases before keys get cleaned up.
861
@pytest.fixture()
1✔
862
def kms_create_alias(kms_create_key, aws_client):
1✔
863
    aliases = []
1✔
864

865
    def _create_alias(**kwargs):
1✔
866
        if "AliasName" not in kwargs:
1✔
867
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
868
        if "TargetKeyId" not in kwargs:
1✔
869
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
870

871
        aws_client.kms.create_alias(**kwargs)
1✔
872
        aliases.append(kwargs["AliasName"])
1✔
873
        return kwargs["AliasName"]
1✔
874

875
    yield _create_alias
1✔
876

877
    for alias in aliases:
1✔
878
        try:
1✔
879
            aws_client.kms.delete_alias(AliasName=alias)
1✔
880
        except Exception as e:
1✔
881
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
882

883

884
@pytest.fixture()
1✔
885
def kms_create_grant(kms_create_key, aws_client):
1✔
886
    grants = []
1✔
887

888
    def _create_grant(**kwargs):
1✔
889
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
890
        # but some GranteePrincipal is required to create a grant.
891
        GRANTEE_PRINCIPAL_ARN = (
1✔
892
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
893
        )
894

895
        if "Operations" not in kwargs:
1✔
896
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
897
        if "GranteePrincipal" not in kwargs:
1✔
898
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
899
        if "KeyId" not in kwargs:
1✔
900
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
901

902
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
903
        grants.append((grant_id, kwargs["KeyId"]))
1✔
904
        return grant_id, kwargs["KeyId"]
1✔
905

906
    yield _create_grant
1✔
907

908
    for grant_id, key_id in grants:
1✔
909
        try:
1✔
910
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
911
        except Exception as e:
×
912
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
913

914

915
@pytest.fixture
1✔
916
def kms_key(kms_create_key):
1✔
917
    return kms_create_key()
1✔
918

919

920
@pytest.fixture
1✔
921
def kms_grant_and_key(kms_key, aws_client):
1✔
922
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
923

924
    return [
1✔
925
        aws_client.kms.create_grant(
926
            KeyId=kms_key["KeyId"],
927
            GranteePrincipal=user_arn,
928
            Operations=["Decrypt", "Encrypt"],
929
        ),
930
        kms_key,
931
    ]
932

933

934
@pytest.fixture
1✔
935
def opensearch_wait_for_cluster(aws_client):
1✔
936
    def _wait_for_cluster(domain_name: str):
1✔
937
        def finished_processing():
1✔
938
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
939
            return status["Processing"] is False and "Endpoint" in status
1✔
940

941
        assert poll_condition(
1✔
942
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
943
        ), f"could not start domain: {domain_name}"
944

945
    return _wait_for_cluster
1✔
946

947

948
@pytest.fixture
1✔
949
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
950
    domains = []
1✔
951

952
    def factory(**kwargs) -> str:
1✔
953
        if "DomainName" not in kwargs:
1✔
954
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
955

956
        aws_client.opensearch.create_domain(**kwargs)
1✔
957

958
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
959

960
        domains.append(kwargs["DomainName"])
1✔
961
        return kwargs["DomainName"]
1✔
962

963
    yield factory
1✔
964

965
    # cleanup
966
    for domain in domains:
1✔
967
        try:
1✔
968
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
969
        except Exception as e:
×
970
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
971

972

973
@pytest.fixture
1✔
974
def opensearch_domain(opensearch_create_domain) -> str:
1✔
975
    return opensearch_create_domain()
1✔
976

977

978
@pytest.fixture
1✔
979
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
980
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
981
    assert "Endpoint" in status
1✔
982
    return f"https://{status['Endpoint']}"
1✔
983

984

985
@pytest.fixture
1✔
986
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
987
    document = {
1✔
988
        "first_name": "Boba",
989
        "last_name": "Fett",
990
        "age": 41,
991
        "about": "I'm just a simple man, trying to make my way in the universe.",
992
        "interests": ["mandalorian armor", "tusken culture"],
993
    }
994
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
995
    response = requests.put(
1✔
996
        document_path,
997
        data=json.dumps(document),
998
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
999
    )
1000
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1001
    return document_path
1✔
1002

1003

1004
# Cleanup fixtures
1005
@pytest.fixture
1✔
1006
def cleanup_stacks(aws_client):
1✔
1007
    def _cleanup_stacks(stacks: List[str]) -> None:
1✔
1008
        stacks = ensure_list(stacks)
1✔
1009
        for stack in stacks:
1✔
1010
            try:
1✔
1011
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1012
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1013
            except Exception:
×
1014
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1015

1016
    return _cleanup_stacks
1✔
1017

1018

1019
@pytest.fixture
1✔
1020
def cleanup_changesets(aws_client):
1✔
1021
    def _cleanup_changesets(changesets: List[str]) -> None:
1✔
1022
        changesets = ensure_list(changesets)
1✔
1023
        for cs in changesets:
1✔
1024
            try:
1✔
1025
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1026
            except Exception:
1✔
1027
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1028

1029
    return _cleanup_changesets
1✔
1030

1031

1032
# Helpers for Cfn
1033

1034

1035
# TODO: exports(!)
1036
@dataclasses.dataclass(frozen=True)
1✔
1037
class DeployResult:
1✔
1038
    change_set_id: str
1✔
1039
    stack_id: str
1✔
1040
    stack_name: str
1✔
1041
    change_set_name: str
1✔
1042
    outputs: Dict[str, str]
1✔
1043

1044
    destroy: Callable[[], None]
1✔
1045

1046

1047
class StackDeployError(Exception):
1✔
1048
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1049
        self.describe_result = describe_res
1✔
1050
        self.events = events
1✔
1051

1052
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1053
        if config.CFN_VERBOSE_ERRORS:
1✔
1054
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1055
        else:
1056
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1057

1058
        super().__init__(msg)
1✔
1059

1060
    def format_events(self, events: list[dict]) -> str:
1✔
1061
        formatted_events = []
1✔
1062

1063
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1064
        for event in chronological_events:
1✔
1065
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1066
                formatted_events.append(self.format_event(event))
1✔
1067

1068
        return "\n".join(formatted_events)
1✔
1069

1070
    @staticmethod
1✔
1071
    def format_event(event: dict) -> str:
1✔
1072
        if reason := event.get("ResourceStatusReason"):
1✔
1073
            reason = reason.replace("\n", "; ")
1✔
1074
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1075
        else:
1076
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
×
1077

1078

1079
@pytest.fixture
1✔
1080
def deploy_cfn_template(
1✔
1081
    aws_client: ServiceLevelClientFactory,
1082
):
1083
    state: list[tuple[str, Callable]] = []
1✔
1084

1085
    def _deploy(
1✔
1086
        *,
1087
        is_update: Optional[bool] = False,
1088
        stack_name: Optional[str] = None,
1089
        change_set_name: Optional[str] = None,
1090
        template: Optional[str] = None,
1091
        template_path: Optional[str | os.PathLike] = None,
1092
        template_mapping: Optional[Dict[str, Any]] = None,
1093
        parameters: Optional[Dict[str, str]] = None,
1094
        role_arn: Optional[str] = None,
1095
        max_wait: Optional[int] = None,
1096
        delay_between_polls: Optional[int] = 2,
1097
        custom_aws_client: Optional[ServiceLevelClientFactory] = None,
1098
    ) -> DeployResult:
1099
        if is_update:
1✔
1100
            assert stack_name
1✔
1101
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1102
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1103

1104
        if max_wait is None:
1✔
1105
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1106

1107
        if template_path is not None:
1✔
1108
            template = load_template_file(template_path)
1✔
1109
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1110

1111
        kwargs = dict(
1✔
1112
            StackName=stack_name,
1113
            ChangeSetName=change_set_name,
1114
            TemplateBody=template_rendered,
1115
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1116
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1117
            Parameters=[
1118
                {
1119
                    "ParameterKey": k,
1120
                    "ParameterValue": v,
1121
                }
1122
                for (k, v) in (parameters or {}).items()
1123
            ],
1124
        )
1125
        if role_arn is not None:
1✔
1126
            kwargs["RoleARN"] = role_arn
×
1127

1128
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1129

1130
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1131

1132
        change_set_id = response["Id"]
1✔
1133
        stack_id = response["StackId"]
1✔
1134

1135
        cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1136
            ChangeSetName=change_set_id
1137
        )
1138
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1139
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1140
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1141
        )
1142

1143
        try:
1✔
1144
            stack_waiter.wait(
1✔
1145
                StackName=stack_id,
1146
                WaiterConfig={
1147
                    "Delay": delay_between_polls,
1148
                    "MaxAttempts": max_wait / delay_between_polls,
1149
                },
1150
            )
1151
        except botocore.exceptions.WaiterError as e:
1✔
1152
            raise StackDeployError(
1✔
1153
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1154
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1155
                    "StackEvents"
1156
                ],
1157
            ) from e
1158

1159
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1160
            "Stacks"
1161
        ][0]
1162
        outputs = describe_stack_res.get("Outputs", [])
1✔
1163

1164
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1165

1166
        def _destroy_stack():
1✔
1167
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1168
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1169
                StackName=stack_id,
1170
                WaiterConfig={
1171
                    "Delay": delay_between_polls,
1172
                    "MaxAttempts": max_wait / delay_between_polls,
1173
                },
1174
            )
1175

1176
        state.append((stack_id, _destroy_stack))
1✔
1177

1178
        return DeployResult(
1✔
1179
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1180
        )
1181

1182
    yield _deploy
1✔
1183

1184
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1185
    for stack_id, teardown in state[::-1]:
1✔
1186
        try:
1✔
1187
            teardown()
1✔
1188
        except Exception as e:
1✔
1189
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1190

1191

1192
@pytest.fixture
1✔
1193
def is_change_set_created_and_available(aws_client):
1✔
1194
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1195
        def _inner():
1✔
1196
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1197
            return (
1✔
1198
                # TODO: CREATE_FAILED should also not lead to further retries
1199
                change_set.get("Status") == "CREATE_COMPLETE"
1200
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1201
            )
1202

1203
        return _inner
1✔
1204

1205
    return _is_change_set_created_and_available
1✔
1206

1207

1208
@pytest.fixture
1✔
1209
def is_change_set_failed_and_unavailable(aws_client):
1✔
1210
    def _is_change_set_created_and_available(change_set_id: str):
×
1211
        def _inner():
×
1212
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1213
            return (
×
1214
                # TODO: CREATE_FAILED should also not lead to further retries
1215
                change_set.get("Status") == "FAILED"
1216
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1217
            )
1218

1219
        return _inner
×
1220

1221
    return _is_change_set_created_and_available
×
1222

1223

1224
@pytest.fixture
1✔
1225
def is_stack_created(aws_client):
1✔
1226
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1227

1228

1229
@pytest.fixture
1✔
1230
def is_stack_updated(aws_client):
1✔
1231
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1232

1233

1234
@pytest.fixture
1✔
1235
def is_stack_deleted(aws_client):
1✔
1236
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1237

1238

1239
def _has_stack_status(cfn_client, statuses: List[str]):
1✔
1240
    def _has_status(stack_id: str):
1✔
1241
        def _inner():
1✔
1242
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1243
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1244
            return s.get("StackStatus") in statuses
1✔
1245

1246
        return _inner
1✔
1247

1248
    return _has_status
1✔
1249

1250

1251
@pytest.fixture
1✔
1252
def is_change_set_finished(aws_client):
1✔
1253
    def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):
1✔
1254
        def _inner():
1✔
1255
            kwargs = {"ChangeSetName": change_set_id}
1✔
1256
            if stack_name:
1✔
1257
                kwargs["StackName"] = stack_name
×
1258

1259
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
1✔
1260

1261
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
1✔
1262
                LOG.warning("Change set failed")
×
1263
                raise ShortCircuitWaitException()
×
1264

1265
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
1✔
1266

1267
        return _inner
1✔
1268

1269
    return _is_change_set_finished
1✔
1270

1271

1272
@pytest.fixture
1✔
1273
def wait_until_lambda_ready(aws_client):
1✔
1274
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1275
        client = client or aws_client.lambda_
1✔
1276

1277
        def _is_not_pending():
1✔
1278
            kwargs = {}
1✔
1279
            if qualifier:
1✔
1280
                kwargs["Qualifier"] = qualifier
×
1281
            try:
1✔
1282
                result = (
1✔
1283
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1284
                    != "Pending"
1285
                )
1286
                LOG.debug("lambda state result: result=%s", result)
1✔
1287
                return result
1✔
1288
            except Exception as e:
×
1289
                LOG.error(e)
×
1290
                raise
×
1291

1292
        wait_until(_is_not_pending)
1✔
1293

1294
    return _wait_until_ready
1✔
1295

1296

1297
role_assume_policy = """
1✔
1298
{
1299
  "Version": "2012-10-17",
1300
  "Statement": [
1301
    {
1302
      "Effect": "Allow",
1303
      "Principal": {
1304
        "Service": "lambda.amazonaws.com"
1305
      },
1306
      "Action": "sts:AssumeRole"
1307
    }
1308
  ]
1309
}
1310
""".strip()
1311

1312
role_policy = """
1✔
1313
{
1314
    "Version": "2012-10-17",
1315
    "Statement": [
1316
        {
1317
            "Effect": "Allow",
1318
            "Action": [
1319
                "logs:CreateLogGroup",
1320
                "logs:CreateLogStream",
1321
                "logs:PutLogEvents"
1322
            ],
1323
            "Resource": [
1324
                "*"
1325
            ]
1326
        }
1327
    ]
1328
}
1329
""".strip()
1330

1331

1332
@pytest.fixture
1✔
1333
def create_lambda_function_aws(aws_client):
1✔
1334
    lambda_arns = []
1✔
1335

1336
    def _create_lambda_function(**kwargs):
1✔
1337
        def _create_function():
1✔
1338
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1339
            lambda_arns.append(resp["FunctionArn"])
1✔
1340

1341
            def _is_not_pending():
1✔
1342
                try:
1✔
1343
                    result = (
1✔
1344
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1345
                            "Configuration"
1346
                        ]["State"]
1347
                        != "Pending"
1348
                    )
1349
                    return result
1✔
1350
                except Exception as e:
×
1351
                    LOG.error(e)
×
1352
                    raise
×
1353

1354
            wait_until(_is_not_pending)
1✔
1355
            return resp
1✔
1356

1357
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1358
        # localstack should normally not require the retries and will just continue here
1359
        return retry(_create_function, retries=3, sleep=4)
1✔
1360

1361
    yield _create_lambda_function
1✔
1362

1363
    for arn in lambda_arns:
1✔
1364
        try:
1✔
1365
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1366
        except Exception:
×
1367
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1368

1369

1370
@pytest.fixture
1✔
1371
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1372
    lambda_arns_and_clients = []
1✔
1373
    log_groups = []
1✔
1374
    lambda_client = aws_client.lambda_
1✔
1375
    logs_client = aws_client.logs
1✔
1376
    s3_client = aws_client.s3
1✔
1377

1378
    def _create_lambda_function(*args, **kwargs):
1✔
1379
        client = kwargs.get("client") or lambda_client
1✔
1380
        kwargs["client"] = client
1✔
1381
        kwargs["s3_client"] = s3_client
1✔
1382
        func_name = kwargs.get("func_name")
1✔
1383
        assert func_name
1✔
1384
        del kwargs["func_name"]
1✔
1385

1386
        if not kwargs.get("role"):
1✔
1387
            kwargs["role"] = lambda_su_role
1✔
1388

1389
        def _create_function():
1✔
1390
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1391
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1392
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1393
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1394
            log_groups.append(log_group_name)
1✔
1395
            return resp
1✔
1396

1397
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1398
        # localstack should normally not require the retries and will just continue here
1399
        return retry(_create_function, retries=3, sleep=4)
1✔
1400

1401
    yield _create_lambda_function
1✔
1402

1403
    for arn, client in lambda_arns_and_clients:
1✔
1404
        try:
1✔
1405
            client.delete_function(FunctionName=arn)
1✔
1406
        except Exception:
1✔
1407
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1408

1409
    for log_group_name in log_groups:
1✔
1410
        try:
1✔
1411
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1412
        except Exception:
1✔
1413
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1414

1415

1416
@pytest.fixture
1✔
1417
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1418
    from localstack.aws.api.lambda_ import Runtime
1✔
1419

1420
    lambda_client = aws_client.lambda_
1✔
1421
    handler_code = textwrap.dedent(
1✔
1422
        """
1423
    import json
1424
    import os
1425

1426

1427
    def make_response(body: dict, status_code: int = 200):
1428
        return {
1429
            "statusCode": status_code,
1430
            "headers": {"Content-Type": "application/json"},
1431
            "body": body,
1432
        }
1433

1434

1435
    def trim_headers(headers):
1436
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1437
            return headers
1438
        return {
1439
            key: value for key, value in headers.items()
1440
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1441
        }
1442

1443

1444
    def handler(event, context):
1445
        print(json.dumps(event))
1446
        response = {
1447
            "args": event.get("queryStringParameters", {}),
1448
            "data": event.get("body", ""),
1449
            "domain": event["requestContext"].get("domainName", ""),
1450
            "headers": trim_headers(event.get("headers", {})),
1451
            "method": event["requestContext"]["http"].get("method", ""),
1452
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1453
            "path": event["requestContext"]["http"].get("path", ""),
1454
        }
1455
        return make_response(response)"""
1456
    )
1457

1458
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1459
        """Creates a server that will echo any request. Any request will be returned with the
1460
        following format. Any unset values will have those defaults.
1461
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1462
        order to create easier Snapshot testing. Default: `False`
1463
        {
1464
          "args": {},
1465
          "headers": {},
1466
          "data": "",
1467
          "method": "",
1468
          "domain": "",
1469
          "origin": "",
1470
          "path": ""
1471
        }"""
1472
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1473
        func_name = f"echo-http-{short_uid()}"
1✔
1474
        create_lambda_function(
1✔
1475
            func_name=func_name,
1476
            zip_file=zip_file,
1477
            runtime=Runtime.python3_12,
1478
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1479
        )
1480
        url_response = lambda_client.create_function_url_config(
1✔
1481
            FunctionName=func_name, AuthType="NONE"
1482
        )
1483
        aws_client.lambda_.add_permission(
1✔
1484
            FunctionName=func_name,
1485
            StatementId="urlPermission",
1486
            Action="lambda:InvokeFunctionUrl",
1487
            Principal="*",
1488
            FunctionUrlAuthType="NONE",
1489
        )
1490
        return url_response["FunctionUrl"]
1✔
1491

1492
    yield _create_echo_http_server
1✔
1493

1494

1495
@pytest.fixture
1✔
1496
def create_event_source_mapping(aws_client):
1✔
1497
    uuids = []
1✔
1498

1499
    def _create_event_source_mapping(*args, **kwargs):
1✔
1500
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1501
        uuids.append(response["UUID"])
1✔
1502
        return response
1✔
1503

1504
    yield _create_event_source_mapping
1✔
1505

1506
    for uuid in uuids:
1✔
1507
        try:
1✔
1508
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1509
        except Exception:
×
1510
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1511

1512

1513
@pytest.fixture
1✔
1514
def check_lambda_logs(aws_client):
1✔
1515
    def _check_logs(func_name: str, expected_lines: List[str] = None) -> List[str]:
1✔
1516
        if not expected_lines:
1✔
1517
            expected_lines = []
×
1518
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1519
        log_messages = [e["message"] for e in log_events]
1✔
1520
        for line in expected_lines:
1✔
1521
            if ".*" in line:
1✔
1522
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1523
                if any(found):
1✔
1524
                    continue
1✔
1525
            assert line in log_messages
×
1526
        return log_messages
1✔
1527

1528
    return _check_logs
1✔
1529

1530

1531
@pytest.fixture
1✔
1532
def create_policy(aws_client):
1✔
1533
    policy_arns = []
1✔
1534

1535
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1536
        iam_client = iam_client or aws_client.iam
1✔
1537
        if "PolicyName" not in kwargs:
1✔
1538
            kwargs["PolicyName"] = f"policy-{short_uid()}"
1✔
1539
        response = iam_client.create_policy(*args, **kwargs)
1✔
1540
        policy_arn = response["Policy"]["Arn"]
1✔
1541
        policy_arns.append((policy_arn, iam_client))
1✔
1542
        return response
1✔
1543

1544
    yield _create_policy
1✔
1545

1546
    for policy_arn, iam_client in policy_arns:
1✔
1547
        try:
1✔
1548
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1549
        except Exception:
1✔
1550
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1551

1552

1553
@pytest.fixture
1✔
1554
def create_user(aws_client):
1✔
1555
    usernames = []
1✔
1556

1557
    def _create_user(**kwargs):
1✔
1558
        if "UserName" not in kwargs:
1✔
1559
            kwargs["UserName"] = f"user-{short_uid()}"
×
1560
        response = aws_client.iam.create_user(**kwargs)
1✔
1561
        usernames.append(response["User"]["UserName"])
1✔
1562
        return response
1✔
1563

1564
    yield _create_user
1✔
1565

1566
    for username in usernames:
1✔
1567
        try:
1✔
1568
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1569
        except ClientError as e:
1✔
1570
            LOG.debug(
1✔
1571
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1572
            )
1573
            continue
1✔
1574

1575
        for inline_policy in inline_policies:
1✔
1576
            try:
1✔
1577
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1578
            except Exception:
×
1579
                LOG.debug(
×
1580
                    "Could not delete user policy '%s' from '%s' during cleanup",
1581
                    inline_policy,
1582
                    username,
1583
                )
1584
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1585
            "AttachedPolicies"
1586
        ]
1587
        for attached_policy in attached_policies:
1✔
1588
            try:
1✔
1589
                aws_client.iam.detach_user_policy(
1✔
1590
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1591
                )
1592
            except Exception:
1✔
1593
                LOG.debug(
1✔
1594
                    "Error detaching policy '%s' from user '%s'",
1595
                    attached_policy["PolicyArn"],
1596
                    username,
1597
                )
1598
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1599
        for access_key in access_keys:
1✔
1600
            try:
1✔
1601
                aws_client.iam.delete_access_key(
1✔
1602
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1603
                )
1604
            except Exception:
×
1605
                LOG.debug(
×
1606
                    "Error deleting access key '%s' from user '%s'",
1607
                    access_key["AccessKeyId"],
1608
                    username,
1609
                )
1610

1611
        try:
1✔
1612
            aws_client.iam.delete_user(UserName=username)
1✔
1613
        except Exception as e:
1✔
1614
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1615

1616

1617
@pytest.fixture
1✔
1618
def wait_and_assume_role(aws_client):
1✔
1619
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1620
        if not session_name:
1✔
1621
            session_name = f"session-{short_uid()}"
1✔
1622

1623
        def assume_role():
1✔
1624
            return aws_client.sts.assume_role(
1✔
1625
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1626
            )["Credentials"]
1627

1628
        # need to retry a couple of times before we are allowed to assume this role in AWS
1629
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1630
        return keys
1✔
1631

1632
    return _wait_and_assume_role
1✔
1633

1634

1635
@pytest.fixture
1✔
1636
def create_role(aws_client):
1✔
1637
    role_names = []
1✔
1638

1639
    def _create_role(iam_client=None, **kwargs):
1✔
1640
        if not kwargs.get("RoleName"):
1✔
1641
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1642
        iam_client = iam_client or aws_client.iam
1✔
1643
        result = iam_client.create_role(**kwargs)
1✔
1644
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1645
        return result
1✔
1646

1647
    yield _create_role
1✔
1648

1649
    for role_name, iam_client in role_names:
1✔
1650
        # detach policies
1651
        try:
1✔
1652
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1653
                "AttachedPolicies"
1654
            ]
1655
        except ClientError as e:
1✔
1656
            LOG.debug(
1✔
1657
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1658
                e,
1659
                role_name,
1660
            )
1661
            continue
1✔
1662
        for attached_policy in attached_policies:
1✔
1663
            try:
1✔
1664
                iam_client.detach_role_policy(
1✔
1665
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1666
                )
1667
            except Exception:
×
1668
                LOG.debug(
×
1669
                    "Could not detach role policy '%s' from '%s' during cleanup",
1670
                    attached_policy["PolicyArn"],
1671
                    role_name,
1672
                )
1673
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1674
        for role_policy in role_policies:
1✔
1675
            try:
1✔
1676
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1677
            except Exception:
×
1678
                LOG.debug(
×
1679
                    "Could not delete role policy '%s' from '%s' during cleanup",
1680
                    role_policy,
1681
                    role_name,
1682
                )
1683
        try:
1✔
1684
            iam_client.delete_role(RoleName=role_name)
1✔
1685
        except Exception:
×
1686
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1687

1688

1689
@pytest.fixture
1✔
1690
def create_parameter(aws_client):
1✔
1691
    params = []
1✔
1692

1693
    def _create_parameter(**kwargs):
1✔
1694
        params.append(kwargs["Name"])
1✔
1695
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1696

1697
    yield _create_parameter
1✔
1698

1699
    for param in params:
1✔
1700
        aws_client.ssm.delete_parameter(Name=param)
1✔
1701

1702

1703
@pytest.fixture
1✔
1704
def create_secret(aws_client):
1✔
1705
    items = []
1✔
1706

1707
    def _create_parameter(**kwargs):
1✔
1708
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1709
        items.append(create_response["ARN"])
1✔
1710
        return create_response
1✔
1711

1712
    yield _create_parameter
1✔
1713

1714
    for item in items:
1✔
1715
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1716

1717

1718
# TODO Figure out how to make cert creation tests pass against AWS.
1719
#
1720
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1721
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1722
#
1723
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1724
# by adding some CNAME records as requested by ASW in response to a certificate request.
1725
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1726
#
1727
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1728
# created by openssl etc. Not sure if there are other issues after that.
1729
#
1730
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1731
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1732
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1733
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1734
# pre-created certificates we would have to use specific domain names instead of random ones.
1735
@pytest.fixture
1✔
1736
def acm_request_certificate(aws_client_factory):
1✔
1737
    certificate_arns = []
1✔
1738

1739
    def factory(**kwargs) -> str:
1✔
1740
        if "DomainName" not in kwargs:
1✔
1741
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1742

1743
        region_name = kwargs.pop("region_name", None)
1✔
1744
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1745

1746
        response = acm_client.request_certificate(**kwargs)
1✔
1747
        created_certificate_arn = response["CertificateArn"]
1✔
1748
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1749
        return response
1✔
1750

1751
    yield factory
1✔
1752

1753
    # cleanup
1754
    for certificate_arn, region_name in certificate_arns:
1✔
1755
        try:
1✔
1756
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1757
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1758
        except Exception as e:
×
1759
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1760

1761

1762
role_policy_su = {
1✔
1763
    "Version": "2012-10-17",
1764
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1765
}
1766

1767

1768
@pytest.fixture(scope="session")
1✔
1769
def lambda_su_role(aws_client):
1✔
1770
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1771
    role = aws_client.iam.create_role(
1✔
1772
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1773
    )["Role"]
1774
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1775
    policy_arn = aws_client.iam.create_policy(
1✔
1776
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1777
    )["Policy"]["Arn"]
1778
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1779

1780
    if is_aws_cloud():  # dirty but necessary
1✔
1781
        time.sleep(10)
×
1782

1783
    yield role["Arn"]
1✔
1784

1785
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1786
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1787
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1788

1789

1790
@pytest.fixture
1✔
1791
def create_iam_role_and_attach_policy(aws_client):
1✔
1792
    """
1793
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1794

1795
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1796
    """
1797
    roles = []
×
1798

1799
    def _inner(**kwargs: dict[str, any]) -> str:
×
1800
        """
1801
        :param dict RoleDefinition: role definition document
1802
        :param str PolicyArn: policy ARN
1803
        :param str RoleName: role name (autogenerated if omitted)
1804
        :return: role ARN
1805
        """
1806
        if "RoleName" not in kwargs:
×
1807
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1808

1809
        role = kwargs["RoleName"]
×
1810
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1811

1812
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1813
        role_arn = result["Role"]["Arn"]
×
1814

1815
        policy_arn = kwargs["PolicyArn"]
×
1816
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1817

1818
        roles.append(role)
×
1819
        return role_arn
×
1820

1821
    yield _inner
×
1822

1823
    for role in roles:
×
1824
        try:
×
1825
            aws_client.iam.delete_role(RoleName=role)
×
1826
        except Exception as exc:
×
1827
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1828

1829

1830
@pytest.fixture
1✔
1831
def create_iam_role_with_policy(aws_client):
1✔
1832
    """
1833
    Fixture that creates an IAM role with given role definition and policy definition.
1834
    """
1835
    roles = {}
1✔
1836

1837
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1838
        """
1839
        :param dict RoleDefinition: role definition document
1840
        :param dict PolicyDefinition: policy definition document
1841
        :param str PolicyName: policy name (autogenerated if omitted)
1842
        :param str RoleName: role name (autogenerated if omitted)
1843
        :return: role ARN
1844
        """
1845
        if "RoleName" not in kwargs:
1✔
1846
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1847
        role = kwargs["RoleName"]
1✔
1848
        if "PolicyName" not in kwargs:
1✔
1849
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1850
        policy = kwargs["PolicyName"]
1✔
1851
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1852

1853
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1854
        role_arn = result["Role"]["Arn"]
1✔
1855

1856
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1857
        aws_client.iam.put_role_policy(
1✔
1858
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1859
        )
1860
        roles[role] = policy
1✔
1861
        return role_arn
1✔
1862

1863
    yield _create_role_and_policy
1✔
1864

1865
    for role_name, policy_name in roles.items():
1✔
1866
        try:
1✔
1867
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1868
        except Exception as exc:
×
1869
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1870
        try:
1✔
1871
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1872
        except Exception as exc:
×
1873
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1874

1875

1876
@pytest.fixture
1✔
1877
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1878
    delivery_stream_names = []
1✔
1879

1880
    def _create_delivery_stream(**kwargs):
1✔
1881
        if "DeliveryStreamName" not in kwargs:
1✔
1882
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1883
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1884
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1885
        delivery_stream_names.append(delivery_stream_name)
1✔
1886
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1887
        return response
1✔
1888

1889
    yield _create_delivery_stream
1✔
1890

1891
    for delivery_stream_name in delivery_stream_names:
1✔
1892
        try:
1✔
1893
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1894
        except Exception:
×
1895
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1896

1897

1898
@pytest.fixture
1✔
1899
def ses_configuration_set(aws_client):
1✔
1900
    configuration_set_names = []
1✔
1901

1902
    def factory(name: str) -> None:
1✔
1903
        aws_client.ses.create_configuration_set(
1✔
1904
            ConfigurationSet={
1905
                "Name": name,
1906
            },
1907
        )
1908
        configuration_set_names.append(name)
1✔
1909

1910
    yield factory
1✔
1911

1912
    for configuration_set_name in configuration_set_names:
1✔
1913
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1914

1915

1916
@pytest.fixture
1✔
1917
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1918
    event_destinations = []
1✔
1919

1920
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1921
        aws_client.ses.create_configuration_set_event_destination(
1✔
1922
            ConfigurationSetName=config_set_name,
1923
            EventDestination={
1924
                "Name": event_destination_name,
1925
                "Enabled": True,
1926
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1927
                "SNSDestination": {
1928
                    "TopicARN": topic_arn,
1929
                },
1930
            },
1931
        )
1932
        event_destinations.append((config_set_name, event_destination_name))
1✔
1933

1934
    yield factory
1✔
1935

1936
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1937
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1938
            ConfigurationSetName=created_config_set_name,
1939
            EventDestinationName=created_event_destination_name,
1940
        )
1941

1942

1943
@pytest.fixture
1✔
1944
def ses_email_template(aws_client):
1✔
1945
    template_names = []
1✔
1946

1947
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1948
        aws_client.ses.create_template(
1✔
1949
            Template={
1950
                "TemplateName": name,
1951
                "SubjectPart": subject,
1952
                "TextPart": contents,
1953
            }
1954
        )
1955
        template_names.append(name)
1✔
1956

1957
    yield factory
1✔
1958

1959
    for template_name in template_names:
1✔
1960
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1961

1962

1963
@pytest.fixture
1✔
1964
def ses_verify_identity(aws_client):
1✔
1965
    identities = []
1✔
1966

1967
    def factory(email_address: str) -> None:
1✔
1968
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1969

1970
    yield factory
1✔
1971

1972
    for identity in identities:
1✔
1973
        aws_client.ses.delete_identity(Identity=identity)
×
1974

1975

1976
@pytest.fixture
1✔
1977
def setup_sender_email_address(ses_verify_identity):
1✔
1978
    """
1979
    If the test is running against AWS then assume the email address passed is already
1980
    verified, and passes the given email address through. Otherwise, it generates one random
1981
    email address and verify them.
1982
    """
1983

1984
    def inner(sender_email_address: Optional[str] = None) -> str:
1✔
1985
        if is_aws_cloud():
1✔
1986
            if sender_email_address is None:
×
1987
                raise ValueError(
×
1988
                    "sender_email_address must be specified to run this test against AWS"
1989
                )
1990
        else:
1991
            # overwrite the given parameters with localstack specific ones
1992
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
1993
            ses_verify_identity(sender_email_address)
1✔
1994

1995
        return sender_email_address
1✔
1996

1997
    return inner
1✔
1998

1999

2000
@pytest.fixture
1✔
2001
def ec2_create_security_group(aws_client):
1✔
2002
    ec2_sgs = []
1✔
2003

2004
    def factory(ports=None, ip_protocol: str = "tcp", **kwargs):
1✔
2005
        """
2006
        Create the target group and authorize the security group ingress.
2007
        :param ports: list of ports to be authorized for the ingress rule.
2008
        :param ip_protocol: the ip protocol for the permissions (tcp by default)
2009
        """
2010
        if "GroupName" not in kwargs:
1✔
2011
            # FIXME: This will fail against AWS since the sg prefix is not valid for GroupName
2012
            # > "Group names may not be in the format sg-*".
2013
            kwargs["GroupName"] = f"sg-{short_uid()}"
1✔
2014
        # Making sure the call to CreateSecurityGroup gets the right arguments
2015
        _args = select_from_typed_dict(CreateSecurityGroupRequest, kwargs)
1✔
2016
        security_group = aws_client.ec2.create_security_group(**_args)
1✔
2017
        security_group_id = security_group["GroupId"]
1✔
2018

2019
        # FIXME: If 'ports' is None or an empty list, authorize_security_group_ingress will fail due to missing IpPermissions.
2020
        # Must ensure ports are explicitly provided or skip authorization entirely if not required.
2021
        permissions = [
1✔
2022
            {
2023
                "FromPort": port,
2024
                "IpProtocol": ip_protocol,
2025
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2026
                "ToPort": port,
2027
            }
2028
            for port in ports or []
2029
        ]
2030
        aws_client.ec2.authorize_security_group_ingress(
1✔
2031
            GroupId=security_group_id,
2032
            IpPermissions=permissions,
2033
        )
2034

2035
        ec2_sgs.append(security_group_id)
1✔
2036
        return security_group
1✔
2037

2038
    yield factory
1✔
2039

2040
    for sg_group_id in ec2_sgs:
1✔
2041
        try:
1✔
2042
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2043
        except Exception as e:
×
2044
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2045

2046

2047
@pytest.fixture
1✔
2048
def cleanups():
1✔
2049
    cleanup_fns = []
1✔
2050

2051
    yield cleanup_fns
1✔
2052

2053
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2054
        try:
1✔
2055
            cleanup_callback()
1✔
2056
        except Exception as e:
1✔
2057
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2058

2059

2060
@pytest.fixture(scope="session")
1✔
2061
def account_id(aws_client):
1✔
2062
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2063
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2064
    else:
2065
        return TEST_AWS_ACCOUNT_ID
×
2066

2067

2068
@pytest.fixture(scope="session")
1✔
2069
def region_name(aws_client):
1✔
2070
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2071
        return aws_client.sts.meta.region_name
1✔
2072
    else:
2073
        return TEST_AWS_REGION_NAME
×
2074

2075

2076
@pytest.fixture(scope="session")
1✔
2077
def partition(region_name):
1✔
2078
    return get_partition(region_name)
1✔
2079

2080

2081
@pytest.fixture(scope="session")
1✔
2082
def secondary_account_id(secondary_aws_client):
1✔
2083
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2084
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2085
    else:
2086
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2087

2088

2089
@pytest.fixture(scope="session")
1✔
2090
def secondary_region_name():
1✔
2091
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2092

2093

2094
@pytest.hookimpl
1✔
2095
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2096
    only_localstack = pytest.mark.skipif(
1✔
2097
        is_aws_cloud(),
2098
        reason="test only applicable if run against localstack",
2099
    )
2100
    for item in items:
1✔
2101
        for mark in item.iter_markers():
1✔
2102
            if mark.name.endswith("only_localstack"):
1✔
2103
                item.add_marker(only_localstack)
1✔
2104
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2105
            # add a marker that indicates that this test is snapshot validated
2106
            # if it uses the snapshot fixture -> allows selecting only snapshot
2107
            # validated tests in order to capture new snapshots for a whole
2108
            # test file with "-m snapshot_validated"
2109
            item.add_marker("snapshot_validated")
1✔
2110

2111

2112
@pytest.fixture
1✔
2113
def sample_stores() -> AccountRegionBundle:
1✔
2114
    class SampleStore(BaseStore):
1✔
2115
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2116
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2117
        region_specific_attr = LocalAttribute(default=list)
1✔
2118

2119
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2120

2121

2122
@pytest.fixture
1✔
2123
def create_rest_apigw(aws_client_factory):
1✔
2124
    rest_apis = []
1✔
2125
    retry_boto_config = None
1✔
2126
    if is_aws_cloud():
1✔
2127
        retry_boto_config = botocore.config.Config(
×
2128
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2129
            retries={"max_attempts": 10, "mode": "adaptive"}
2130
        )
2131

2132
    def _create_apigateway_function(**kwargs):
1✔
2133
        client_region_name = kwargs.pop("region_name", None)
1✔
2134
        apigateway_client = aws_client_factory(
1✔
2135
            region_name=client_region_name, config=retry_boto_config
2136
        ).apigateway
2137
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2138

2139
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2140
        api_id = response.get("id")
1✔
2141
        rest_apis.append((api_id, client_region_name))
1✔
2142

2143
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2144

2145
    yield _create_apigateway_function
1✔
2146

2147
    for rest_api_id, _client_region_name in rest_apis:
1✔
2148
        apigateway_client = aws_client_factory(
1✔
2149
            region_name=_client_region_name,
2150
            config=retry_boto_config,
2151
        ).apigateway
2152
        # First, retrieve the usage plans associated with the REST API
2153
        usage_plan_ids = []
1✔
2154
        usage_plans = apigateway_client.get_usage_plans()
1✔
2155
        for item in usage_plans.get("items", []):
1✔
2156
            api_stages = item.get("apiStages", [])
1✔
2157
            usage_plan_ids.extend(
1✔
2158
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2159
            )
2160

2161
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2162
        with contextlib.suppress(Exception):
1✔
2163
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2164

2165
        # finally delete the usage plans and the API Keys linked to it
2166
        for usage_plan_id in usage_plan_ids:
1✔
2167
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2168
            for key in usage_plan_keys.get("items", []):
1✔
2169
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2170
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2171

2172

2173
@pytest.fixture
1✔
2174
def create_rest_apigw_openapi(aws_client_factory):
1✔
2175
    rest_apis = []
×
2176

2177
    def _create_apigateway_function(**kwargs):
×
2178
        region_name = kwargs.pop("region_name", None)
×
2179
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2180

2181
        response = apigateway_client.import_rest_api(**kwargs)
×
2182
        api_id = response.get("id")
×
2183
        rest_apis.append((api_id, region_name))
×
2184
        return api_id, response
×
2185

2186
    yield _create_apigateway_function
×
2187

2188
    for rest_api_id, region_name in rest_apis:
×
2189
        with contextlib.suppress(Exception):
×
2190
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2191
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2192

2193

2194
@pytest.fixture
1✔
2195
def appsync_create_api(aws_client):
1✔
2196
    graphql_apis = []
×
2197

2198
    def factory(**kwargs):
×
2199
        if "name" not in kwargs:
×
2200
            kwargs["name"] = f"graphql-api-testing-name-{short_uid()}"
×
2201
        if not kwargs.get("authenticationType"):
×
2202
            kwargs["authenticationType"] = "API_KEY"
×
2203

2204
        result = aws_client.appsync.create_graphql_api(**kwargs)["graphqlApi"]
×
2205
        graphql_apis.append(result["apiId"])
×
2206
        return result
×
2207

2208
    yield factory
×
2209

2210
    for api in graphql_apis:
×
2211
        try:
×
2212
            aws_client.appsync.delete_graphql_api(apiId=api)
×
2213
        except Exception as e:
×
2214
            LOG.debug("Error cleaning up AppSync API: %s, %s", api, e)
×
2215

2216

2217
@pytest.fixture
1✔
2218
def assert_host_customisation(monkeypatch):
1✔
2219
    localstack_host = "foo.bar"
1✔
2220
    monkeypatch.setattr(
1✔
2221
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2222
    )
2223

2224
    def asserter(
1✔
2225
        url: str,
2226
        *,
2227
        custom_host: Optional[str] = None,
2228
    ):
2229
        if custom_host is not None:
1✔
2230
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2231

2232
            assert localstack_host not in url
×
2233
        else:
2234
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2235

2236
    yield asserter
1✔
2237

2238

2239
@pytest.fixture
1✔
2240
def echo_http_server(httpserver: HTTPServer):
1✔
2241
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2242

2243
    def _echo(request: Request) -> Response:
1✔
2244
        request_json = None
1✔
2245
        if request.is_json:
1✔
2246
            with contextlib.suppress(ValueError):
1✔
2247
                request_json = json.loads(request.data)
1✔
2248
        result = {
1✔
2249
            "data": request.data or "{}",
2250
            "headers": dict(request.headers),
2251
            "url": request.url,
2252
            "method": request.method,
2253
            "json": request_json,
2254
        }
2255
        response_body = json.dumps(json_safe(result))
1✔
2256
        return Response(response_body, status=200)
1✔
2257

2258
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2259
    http_endpoint = httpserver.url_for("/")
1✔
2260

2261
    return http_endpoint
1✔
2262

2263

2264
@pytest.fixture
1✔
2265
def echo_http_server_post(echo_http_server):
1✔
2266
    """
2267
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2268
    """
2269
    if is_aws_cloud():
1✔
2270
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2271

2272
    return f"{echo_http_server}post"
1✔
2273

2274

2275
def create_policy_doc(effect: str, actions: List, resource=None) -> Dict:
1✔
2276
    actions = ensure_list(actions)
1✔
2277
    resource = resource or "*"
1✔
2278
    return {
1✔
2279
        "Version": "2012-10-17",
2280
        "Statement": [
2281
            {
2282
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2283
                "Sid": f"s{short_uid()}",
2284
                "Effect": effect,
2285
                "Action": actions,
2286
                "Resource": resource,
2287
            }
2288
        ],
2289
    }
2290

2291

2292
@pytest.fixture
1✔
2293
def create_policy_generated_document(create_policy):
1✔
2294
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2295
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2296
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2297
        response = create_policy(
1✔
2298
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2299
        )
2300
        policy_arn = response["Policy"]["Arn"]
1✔
2301
        return policy_arn
1✔
2302

2303
    return _create_policy_with_doc
1✔
2304

2305

2306
@pytest.fixture
1✔
2307
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2308
    def _create_role_with_policy(
1✔
2309
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2310
    ):
2311
        iam_client = iam_client or aws_client.iam
1✔
2312

2313
        role_name = f"role-{short_uid()}"
1✔
2314
        result = create_role(
1✔
2315
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2316
        )
2317
        role_arn = result["Role"]["Arn"]
1✔
2318
        policy_name = f"p-{short_uid()}"
1✔
2319

2320
        if attach:
1✔
2321
            # create role and attach role policy
2322
            policy_arn = create_policy_generated_document(
1✔
2323
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2324
            )
2325
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2326
        else:
2327
            # put role policy
2328
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2329
            policy_document = json.dumps(policy_document)
1✔
2330
            iam_client.put_role_policy(
1✔
2331
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2332
            )
2333

2334
        return role_name, role_arn
1✔
2335

2336
    return _create_role_with_policy
1✔
2337

2338

2339
@pytest.fixture
1✔
2340
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2341
    def _create_user_with_policy(effect, actions, resource=None):
×
2342
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2343
        username = f"user-{short_uid()}"
×
2344
        create_user(UserName=username)
×
2345
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2346
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2347
        return username, keys
×
2348

2349
    return _create_user_with_policy
×
2350

2351

2352
@pytest.fixture()
1✔
2353
def register_extension(s3_bucket, aws_client):
1✔
2354
    cfn_client = aws_client.cloudformation
×
2355
    extensions_arns = []
×
2356

2357
    def _register(extension_name, extension_type, artifact_path):
×
2358
        bucket = s3_bucket
×
2359
        key = f"artifact-{short_uid()}"
×
2360

2361
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2362

2363
        register_response = cfn_client.register_type(
×
2364
            Type=extension_type,
2365
            TypeName=extension_name,
2366
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2367
        )
2368

2369
        registration_token = register_response["RegistrationToken"]
×
2370
        cfn_client.get_waiter("type_registration_complete").wait(
×
2371
            RegistrationToken=registration_token
2372
        )
2373

2374
        describe_response = cfn_client.describe_type_registration(
×
2375
            RegistrationToken=registration_token
2376
        )
2377

2378
        extensions_arns.append(describe_response["TypeArn"])
×
2379
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2380

2381
        return describe_response
×
2382

2383
    yield _register
×
2384

2385
    for arn in extensions_arns:
×
2386
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2387
        for v in versions:
×
2388
            try:
×
2389
                cfn_client.deregister_type(Arn=v["Arn"])
×
2390
            except Exception:
×
2391
                continue
×
2392
        cfn_client.deregister_type(Arn=arn)
×
2393

2394

2395
@pytest.fixture
1✔
2396
def hosted_zone(aws_client):
1✔
2397
    zone_ids = []
1✔
2398

2399
    def factory(**kwargs):
1✔
2400
        if "CallerReference" not in kwargs:
1✔
2401
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2402
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2403
        zone_id = response["HostedZone"]["Id"]
1✔
2404
        zone_ids.append(zone_id)
1✔
2405
        return response
1✔
2406

2407
    yield factory
1✔
2408

2409
    for zone_id in zone_ids[::-1]:
1✔
2410
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2411

2412

2413
@pytest.fixture
1✔
2414
def openapi_validate(monkeypatch):
1✔
2415
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2416
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2417

2418

2419
@pytest.fixture
1✔
2420
def set_resource_custom_id():
1✔
2421
    set_ids = []
1✔
2422

2423
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2424
        localstack_id_manager.set_custom_id(
1✔
2425
            resource_identifier=resource_identifier, custom_id=custom_id
2426
        )
2427
        set_ids.append(resource_identifier)
1✔
2428

2429
    yield _set_custom_id
1✔
2430

2431
    for resource_identifier in set_ids:
1✔
2432
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2433

2434

2435
###############################
2436
# Events (EventBridge) fixtures
2437
###############################
2438

2439

2440
@pytest.fixture
1✔
2441
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2442
    event_bus_names = []
1✔
2443

2444
    def _create_event_bus(**kwargs):
1✔
2445
        if "Name" not in kwargs:
1✔
2446
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2447

2448
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2449
        event_bus_names.append(kwargs["Name"])
1✔
2450
        return response
1✔
2451

2452
    yield _create_event_bus
1✔
2453

2454
    for event_bus_name in event_bus_names:
1✔
2455
        try:
1✔
2456
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2457
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2458

2459
            # Delete all rules for the current event bus
2460
            for rule in rules:
1✔
2461
                try:
1✔
2462
                    response = aws_client.events.list_targets_by_rule(
1✔
2463
                        Rule=rule, EventBusName=event_bus_name
2464
                    )
2465
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2466

2467
                    # Remove all targets for the current rule
2468
                    if targets:
1✔
2469
                        for target in targets:
1✔
2470
                            aws_client.events.remove_targets(
1✔
2471
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2472
                            )
2473

2474
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2475
                except Exception as e:
×
2476
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2477

2478
            # Delete archives for event bus
2479
            event_source_arn = (
1✔
2480
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2481
            )
2482
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2483
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2484
            for archive in archives:
1✔
2485
                try:
1✔
2486
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2487
                except Exception as e:
×
2488
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2489

2490
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2491
        except Exception as e:
1✔
2492
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2493

2494

2495
@pytest.fixture
1✔
2496
def events_put_rule(aws_client):
1✔
2497
    rules = []
1✔
2498

2499
    def _put_rule(**kwargs):
1✔
2500
        if "Name" not in kwargs:
1✔
2501
            kwargs["Name"] = f"rule-{short_uid()}"
×
2502

2503
        response = aws_client.events.put_rule(**kwargs)
1✔
2504
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2505
        return response
1✔
2506

2507
    yield _put_rule
1✔
2508

2509
    for rule, event_bus_name in rules:
1✔
2510
        try:
1✔
2511
            response = aws_client.events.list_targets_by_rule(
1✔
2512
                Rule=rule, EventBusName=event_bus_name
2513
            )
2514
            targets = [target["Id"] for target in response["Targets"]]
1✔
2515

2516
            # Remove all targets for the current rule
2517
            if targets:
1✔
2518
                for target in targets:
1✔
2519
                    aws_client.events.remove_targets(
1✔
2520
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2521
                    )
2522

2523
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2524
        except Exception as e:
1✔
2525
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2526

2527

2528
@pytest.fixture
1✔
2529
def events_create_rule(aws_client):
1✔
2530
    rules = []
1✔
2531

2532
    def _create_rule(**kwargs):
1✔
2533
        rule_name = kwargs["Name"]
1✔
2534
        bus_name = kwargs.get("EventBusName", "")
1✔
2535
        pattern = kwargs.get("EventPattern", {})
1✔
2536
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2537
        rule_arn = aws_client.events.put_rule(
1✔
2538
            Name=rule_name,
2539
            EventBusName=bus_name,
2540
            EventPattern=json.dumps(pattern),
2541
            ScheduleExpression=schedule,
2542
        )["RuleArn"]
2543
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2544
        return rule_arn
1✔
2545

2546
    yield _create_rule
1✔
2547

2548
    for rule in rules:
1✔
2549
        targets = aws_client.events.list_targets_by_rule(
1✔
2550
            Rule=rule["name"], EventBusName=rule["bus"]
2551
        )["Targets"]
2552

2553
        targetIds = [target["Id"] for target in targets]
1✔
2554
        if len(targetIds) > 0:
1✔
2555
            aws_client.events.remove_targets(
1✔
2556
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2557
            )
2558

2559
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2560

2561

2562
@pytest.fixture
1✔
2563
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2564
    queue_urls = []
1✔
2565

2566
    def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]:
1✔
2567
        if not queue_name:
1✔
2568
            queue_name = f"tests-queue-{short_uid()}"
1✔
2569
        sqs_client = aws_client.sqs
1✔
2570
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2571
        queue_urls.append(queue_url)
1✔
2572
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2573
        policy = {
1✔
2574
            "Version": "2012-10-17",
2575
            "Id": f"sqs-eventbridge-{short_uid()}",
2576
            "Statement": [
2577
                {
2578
                    "Sid": f"SendMessage-{short_uid()}",
2579
                    "Effect": "Allow",
2580
                    "Principal": {"Service": "events.amazonaws.com"},
2581
                    "Action": "sqs:SendMessage",
2582
                    "Resource": queue_arn,
2583
                }
2584
            ],
2585
        }
2586
        sqs_client.set_queue_attributes(
1✔
2587
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2588
        )
2589
        return queue_url, queue_arn
1✔
2590

2591
    yield _sqs_as_events_target
1✔
2592

2593
    for queue_url in queue_urls:
1✔
2594
        try:
1✔
2595
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
2596
        except Exception as e:
×
2597
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2598

2599

2600
@pytest.fixture
1✔
2601
def clean_up(
1✔
2602
    aws_client,
2603
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2604
    def _clean_up(
1✔
2605
        bus_name=None,
2606
        rule_name=None,
2607
        target_ids=None,
2608
        queue_url=None,
2609
        log_group_name=None,
2610
    ):
2611
        events_client = aws_client.events
1✔
2612
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2613
        if target_ids:
1✔
2614
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2615
            call_safe(
1✔
2616
                events_client.remove_targets,
2617
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2618
            )
2619
        if rule_name:
1✔
2620
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2621
        if bus_name:
1✔
2622
            call_safe(events_client.delete_event_bus, kwargs=dict(Name=bus_name))
×
2623
        if queue_url:
1✔
2624
            sqs_client = aws_client.sqs
×
2625
            call_safe(sqs_client.delete_queue, kwargs=dict(QueueUrl=queue_url))
×
2626
        if log_group_name:
1✔
2627
            logs_client = aws_client.logs
×
2628

2629
            def _delete_log_group():
×
2630
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2631
                for log_stream in log_streams["logStreams"]:
×
2632
                    logs_client.delete_log_stream(
×
2633
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2634
                    )
2635
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2636

2637
            call_safe(_delete_log_group)
×
2638

2639
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc