• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18515922497

14 Oct 2025 10:19PM UTC coverage: 86.884% (-0.004%) from 86.888%
18515922497

push

github

web-flow
DynamoDB: fix snapshot skip for MA/MR global table (#13270)

68025 of 78294 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.58
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from collections.abc import Callable
1✔
10
from typing import TYPE_CHECKING, Any, Unpack
1✔
11

12
import botocore.auth
1✔
13
import botocore.config
1✔
14
import botocore.credentials
1✔
15
import botocore.session
1✔
16
import pytest
1✔
17
from _pytest.config import Config
1✔
18
from _pytest.nodes import Item
1✔
19
from botocore.exceptions import ClientError
1✔
20
from botocore.regions import EndpointResolver
1✔
21
from pytest_httpserver import HTTPServer
1✔
22
from werkzeug import Request, Response
1✔
23

24
from localstack import config
1✔
25
from localstack.aws.api.cloudformation import CreateChangeSetInput, Parameter
1✔
26
from localstack.aws.api.ec2 import CreateSecurityGroupRequest, CreateVpcEndpointRequest, VpcEndpoint
1✔
27
from localstack.aws.connect import ServiceLevelClientFactory
1✔
28
from localstack.services.stores import (
1✔
29
    AccountRegionBundle,
30
    BaseStore,
31
    CrossAccountAttribute,
32
    CrossRegionAttribute,
33
    LocalAttribute,
34
)
35
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
36
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
37
from localstack.testing.config import (
1✔
38
    SECONDARY_TEST_AWS_ACCOUNT_ID,
39
    SECONDARY_TEST_AWS_REGION_NAME,
40
    TEST_AWS_ACCOUNT_ID,
41
    TEST_AWS_REGION_NAME,
42
)
43
from localstack.utils import testutil
1✔
44
from localstack.utils.aws.arns import get_partition
1✔
45
from localstack.utils.aws.client import SigningHttpClient
1✔
46
from localstack.utils.aws.resources import create_dynamodb_table
1✔
47
from localstack.utils.bootstrap import is_api_enabled
1✔
48
from localstack.utils.collections import ensure_list, select_from_typed_dict
1✔
49
from localstack.utils.functions import call_safe, run_safe
1✔
50
from localstack.utils.http import safe_requests as requests
1✔
51
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
52
from localstack.utils.json import CustomEncoder, json_safe
1✔
53
from localstack.utils.net import wait_for_port_open
1✔
54
from localstack.utils.strings import short_uid, to_str
1✔
55
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
56

57
LOG = logging.getLogger(__name__)
1✔
58

59
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
60
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
61

62
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
63
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
64
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
65
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
66

67

68
if TYPE_CHECKING:
1✔
69
    from mypy_boto3_sqs import SQSClient
×
70
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
71

72

73
@pytest.fixture(scope="session")
1✔
74
def aws_client_no_retry(aws_client_factory):
1✔
75
    """
76
    This fixture can be used to obtain Boto clients with disabled retries for testing.
77
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
78

79
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
80
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
81

82
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
83
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
84
    General socket/connection errors:
85
    * ConnectionError
86
    * ConnectionClosedError
87
    * ReadTimeoutError
88
    * EndpointConnectionError
89

90
    Service-side throttling/limit errors and exceptions:
91
    * Throttling
92
    * ThrottlingException
93
    * ThrottledException
94
    * RequestThrottledException
95
    * ProvisionedThroughputExceededException
96

97
    HTTP status codes: 429, 500, 502, 503, 504, and 509
98

99
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
100
    """
101
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
102
    return aws_client_factory(config=no_retry_config)
1✔
103

104

105
@pytest.fixture(scope="class")
1✔
106
def aws_http_client_factory(aws_session):
1✔
107
    """
108
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
109
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
110
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
111
    LocalStack.
112

113
    Example invocations
114

115
        client = aws_signing_http_client_factory("sqs")
116
        client.get("http://localhost:4566/000000000000/my-queue")
117

118
    or
119
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
120
        client.post("...")
121
    """
122

123
    def factory(
1✔
124
        service: str,
125
        region: str = None,
126
        signer_factory: Callable[
127
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
128
        ] = botocore.auth.SigV4QueryAuth,
129
        endpoint_url: str = None,
130
        aws_access_key_id: str = None,
131
        aws_secret_access_key: str = None,
132
    ):
133
        region = region or TEST_AWS_REGION_NAME
1✔
134

135
        if aws_access_key_id or aws_secret_access_key:
1✔
136
            credentials = botocore.credentials.Credentials(
1✔
137
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
138
            )
139
        else:
140
            credentials = aws_session.get_credentials()
1✔
141

142
        creds = credentials.get_frozen_credentials()
1✔
143

144
        if not endpoint_url:
1✔
145
            if is_aws_cloud():
1✔
146
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
147
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
148
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
149
            else:
150
                endpoint_url = config.internal_service_url()
1✔
151

152
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
153

154
    return factory
1✔
155

156

157
@pytest.fixture(scope="class")
1✔
158
def s3_vhost_client(aws_client_factory, region_name):
1✔
159
    return aws_client_factory(
1✔
160
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
161
    ).s3
162

163

164
@pytest.fixture
1✔
165
def dynamodb_wait_for_table_active(aws_client):
1✔
166
    def wait_for_table_active(table_name: str, client=None):
1✔
167
        def wait():
1✔
168
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
169
                "TableStatus"
170
            ] == "ACTIVE"
171

172
        poll_condition(wait, timeout=30)
1✔
173

174
    return wait_for_table_active
1✔
175

176

177
@pytest.fixture
1✔
178
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
179
    tables = []
1✔
180

181
    def factory(**kwargs):
1✔
182
        if "TableName" not in kwargs:
1✔
183
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
184

185
        tables.append(kwargs["TableName"])
1✔
186
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
187
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
188
        return response
1✔
189

190
    yield factory
1✔
191

192
    # cleanup
193
    for table in tables:
1✔
194
        try:
1✔
195
            # table has to be in ACTIVE state before deletion
196
            dynamodb_wait_for_table_active(table)
1✔
197
            aws_client.dynamodb.delete_table(TableName=table)
1✔
198
        except Exception as e:
1✔
199
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
200

201

202
@pytest.fixture
1✔
203
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
204
    # beware, this swallows exception in create_dynamodb_table utility function
205
    tables = []
1✔
206

207
    def factory(**kwargs):
1✔
208
        kwargs["client"] = aws_client.dynamodb
1✔
209
        if "table_name" not in kwargs:
1✔
210
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
211
        if "partition_key" not in kwargs:
1✔
212
            kwargs["partition_key"] = "id"
1✔
213

214
        tables.append(kwargs["table_name"])
1✔
215

216
        return create_dynamodb_table(**kwargs)
1✔
217

218
    yield factory
1✔
219

220
    # cleanup
221
    for table in tables:
1✔
222
        try:
1✔
223
            # table has to be in ACTIVE state before deletion
224
            dynamodb_wait_for_table_active(table)
1✔
225
            aws_client.dynamodb.delete_table(TableName=table)
1✔
226
        except Exception as e:
1✔
227
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
228

229

230
@pytest.fixture
1✔
231
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
232
    buckets = []
1✔
233

234
    def factory(**kwargs) -> str:
1✔
235
        if "Bucket" not in kwargs:
1✔
236
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
1✔
237

238
        if (
1✔
239
            "CreateBucketConfiguration" not in kwargs
240
            and aws_client.s3.meta.region_name != "us-east-1"
241
        ):
242
            kwargs["CreateBucketConfiguration"] = {
×
243
                "LocationConstraint": aws_client.s3.meta.region_name
244
            }
245

246
        aws_client.s3.create_bucket(**kwargs)
1✔
247
        buckets.append(kwargs["Bucket"])
1✔
248
        return kwargs["Bucket"]
1✔
249

250
    yield factory
1✔
251

252
    # cleanup
253
    for bucket in buckets:
1✔
254
        try:
1✔
255
            s3_empty_bucket(bucket)
1✔
256
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
257
        except Exception as e:
1✔
258
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
259

260

261
@pytest.fixture
1✔
262
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
263
    buckets = []
1✔
264

265
    def factory(s3_client, **kwargs) -> str:
1✔
266
        if "Bucket" not in kwargs:
1✔
267
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
268

269
        response = s3_client.create_bucket(**kwargs)
1✔
270
        buckets.append(kwargs["Bucket"])
1✔
271
        return response
1✔
272

273
    yield factory
1✔
274

275
    # cleanup
276
    for bucket in buckets:
1✔
277
        try:
1✔
278
            s3_empty_bucket(bucket)
1✔
279
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
280
        except Exception as e:
×
281
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
282

283

284
@pytest.fixture
1✔
285
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
286
    region = aws_client.s3.meta.region_name
1✔
287
    kwargs = {}
1✔
288
    if region != "us-east-1":
1✔
289
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
290
    return s3_create_bucket(**kwargs)
1✔
291

292

293
@pytest.fixture
1✔
294
def s3_empty_bucket(aws_client):
1✔
295
    """
296
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
297
    """
298

299
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
300
    # FIXME: this won't work when bucket has more than 1000 objects
301
    def factory(bucket_name: str):
1✔
302
        kwargs = {}
1✔
303
        try:
1✔
304
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
305
            kwargs["BypassGovernanceRetention"] = True
1✔
306
        except ClientError:
1✔
307
            pass
1✔
308

309
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
310
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
311
        if objects:
1✔
312
            aws_client.s3.delete_objects(
1✔
313
                Bucket=bucket_name,
314
                Delete={"Objects": objects},
315
                **kwargs,
316
            )
317

318
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
319
        versions = response.get("Versions", [])
1✔
320
        versions.extend(response.get("DeleteMarkers", []))
1✔
321

322
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
323
        if object_versions:
1✔
324
            aws_client.s3.delete_objects(
1✔
325
                Bucket=bucket_name,
326
                Delete={"Objects": object_versions},
327
                **kwargs,
328
            )
329

330
    yield factory
1✔
331

332

333
@pytest.fixture
1✔
334
def sqs_create_queue(aws_client):
1✔
335
    queue_urls = []
1✔
336

337
    def factory(**kwargs):
1✔
338
        if "QueueName" not in kwargs:
1✔
339
            kwargs["QueueName"] = f"test-queue-{short_uid()}"
1✔
340

341
        response = aws_client.sqs.create_queue(**kwargs)
1✔
342
        url = response["QueueUrl"]
1✔
343
        queue_urls.append(url)
1✔
344

345
        return url
1✔
346

347
    yield factory
1✔
348

349
    # cleanup
350
    for queue_url in queue_urls:
1✔
351
        try:
1✔
352
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
353
        except Exception as e:
1✔
354
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
355

356

357
@pytest.fixture
1✔
358
def sqs_receive_messages_delete(aws_client):
1✔
359
    def factory(
1✔
360
        queue_url: str,
361
        expected_messages: int | None = None,
362
        wait_time: int | None = 5,
363
    ):
364
        response = aws_client.sqs.receive_message(
1✔
365
            QueueUrl=queue_url,
366
            MessageAttributeNames=["All"],
367
            VisibilityTimeout=0,
368
            WaitTimeSeconds=wait_time,
369
        )
370
        messages = []
1✔
371
        for m in response["Messages"]:
1✔
372
            message = json.loads(to_str(m["Body"]))
1✔
373
            messages.append(message)
1✔
374

375
        if expected_messages is not None:
1✔
376
            assert len(messages) == expected_messages
×
377

378
        for message in response["Messages"]:
1✔
379
            aws_client.sqs.delete_message(
1✔
380
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
381
            )
382

383
        return messages
1✔
384

385
    return factory
1✔
386

387

388
@pytest.fixture
1✔
389
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
390
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
391
        all_messages = []
1✔
392
        for _ in range(max_iterations):
1✔
393
            try:
1✔
394
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
395
            except KeyError:
×
396
                # there were no messages
397
                continue
×
398
            all_messages.extend(messages)
1✔
399

400
            if len(all_messages) >= expected_messages:
1✔
401
                return all_messages[:expected_messages]
1✔
402

403
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
404

405
    return factory
1✔
406

407

408
@pytest.fixture
1✔
409
def sqs_collect_messages(aws_client):
1✔
410
    """Collects SQS messages from a given queue_url and deletes them by default.
411
    Example usage:
412
    messages = sqs_collect_messages(
413
         my_queue_url,
414
         expected=2,
415
         timeout=10,
416
         attribute_names=["All"],
417
         message_attribute_names=["All"],
418
    )
419
    """
420

421
    def factory(
1✔
422
        queue_url: str,
423
        expected: int,
424
        timeout: int,
425
        delete: bool = True,
426
        attribute_names: list[str] = None,
427
        message_attribute_names: list[str] = None,
428
        max_number_of_messages: int = 1,
429
        wait_time_seconds: int = 5,
430
        sqs_client: "SQSClient | None" = None,
431
    ) -> list["MessageTypeDef"]:
432
        sqs_client = sqs_client or aws_client.sqs
1✔
433
        collected = []
1✔
434

435
        def _receive():
1✔
436
            response = sqs_client.receive_message(
1✔
437
                QueueUrl=queue_url,
438
                # Maximum is 20 seconds. Performs long polling.
439
                WaitTimeSeconds=wait_time_seconds,
440
                # Maximum 10 messages
441
                MaxNumberOfMessages=max_number_of_messages,
442
                AttributeNames=attribute_names or [],
443
                MessageAttributeNames=message_attribute_names or [],
444
            )
445

446
            if messages := response.get("Messages"):
1✔
447
                collected.extend(messages)
1✔
448

449
                if delete:
1✔
450
                    for m in messages:
1✔
451
                        sqs_client.delete_message(
1✔
452
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
453
                        )
454

455
            return len(collected) >= expected
1✔
456

457
        if not poll_condition(_receive, timeout=timeout):
1✔
458
            raise TimeoutError(
×
459
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
460
            )
461

462
        return collected
1✔
463

464
    yield factory
1✔
465

466

467
@pytest.fixture
1✔
468
def sqs_queue(sqs_create_queue):
1✔
469
    return sqs_create_queue()
1✔
470

471

472
@pytest.fixture
1✔
473
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
474
    def _get_queue_arn(queue_url: str) -> str:
1✔
475
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
476
            "Attributes"
477
        ]["QueueArn"]
478

479
    return _get_queue_arn
1✔
480

481

482
@pytest.fixture
1✔
483
def sqs_queue_exists(aws_client):
1✔
484
    def _queue_exists(queue_url: str) -> bool:
1✔
485
        """
486
        Checks whether a queue with the given queue URL exists.
487
        :param queue_url: the queue URL
488
        :return: true if the queue exists, false otherwise
489
        """
490
        try:
1✔
491
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
492
            return result.get("QueueUrl") == queue_url
×
493
        except ClientError as e:
1✔
494
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
495
                return False
1✔
496
            raise
×
497

498
    yield _queue_exists
1✔
499

500

501
@pytest.fixture
1✔
502
def sns_create_topic(aws_client):
1✔
503
    topic_arns = []
1✔
504

505
    def _create_topic(**kwargs):
1✔
506
        if "Name" not in kwargs:
1✔
507
            kwargs["Name"] = f"test-topic-{short_uid()}"
1✔
508
        response = aws_client.sns.create_topic(**kwargs)
1✔
509
        topic_arns.append(response["TopicArn"])
1✔
510
        return response
1✔
511

512
    yield _create_topic
1✔
513

514
    for topic_arn in topic_arns:
1✔
515
        try:
1✔
516
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
517
        except Exception as e:
×
518
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
519

520

521
@pytest.fixture
1✔
522
def sns_wait_for_topic_delete(aws_client):
1✔
523
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
524
        def wait():
1✔
525
            try:
1✔
526
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
527
                return False
×
528
            except Exception as e:
1✔
529
                if "NotFound" in e.response["Error"]["Code"]:
1✔
530
                    return True
1✔
531

532
                raise
×
533

534
        poll_condition(wait, timeout=30)
1✔
535

536
    return wait_for_topic_delete
1✔
537

538

539
@pytest.fixture
1✔
540
def sns_subscription(aws_client):
1✔
541
    sub_arns = []
1✔
542

543
    def _create_sub(**kwargs):
1✔
544
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
545
            kwargs["ReturnSubscriptionArn"] = True
1✔
546

547
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
548
        response = aws_client.sns.subscribe(**kwargs)
1✔
549
        sub_arn = response["SubscriptionArn"]
1✔
550
        sub_arns.append(sub_arn)
1✔
551
        return response
1✔
552

553
    yield _create_sub
1✔
554

555
    for sub_arn in sub_arns:
1✔
556
        try:
1✔
557
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
558
        except Exception as e:
1✔
559
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
560

561

562
@pytest.fixture
1✔
563
def sns_topic(sns_create_topic, aws_client):
1✔
564
    topic_arn = sns_create_topic()["TopicArn"]
1✔
565
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
566

567

568
@pytest.fixture
1✔
569
def sns_allow_topic_sqs_queue(aws_client):
1✔
570
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
571
        # allow topic to write to sqs queue
572
        aws_client.sqs.set_queue_attributes(
1✔
573
            QueueUrl=sqs_queue_url,
574
            Attributes={
575
                "Policy": json.dumps(
576
                    {
577
                        "Statement": [
578
                            {
579
                                "Effect": "Allow",
580
                                "Principal": {"Service": "sns.amazonaws.com"},
581
                                "Action": "sqs:SendMessage",
582
                                "Resource": sqs_queue_arn,
583
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
584
                            }
585
                        ]
586
                    }
587
                )
588
            },
589
        )
590

591
    return _allow_sns_topic
1✔
592

593

594
@pytest.fixture
1✔
595
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
596
    subscriptions = []
1✔
597

598
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> dict[str, str]:
1✔
599
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
600

601
        # connect sns topic to sqs
602
        subscription = aws_client.sns.subscribe(
1✔
603
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
604
        )
605
        subscription_arn = subscription["SubscriptionArn"]
1✔
606

607
        # allow topic to write to sqs queue
608
        sns_allow_topic_sqs_queue(
1✔
609
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
610
        )
611

612
        subscriptions.append(subscription_arn)
1✔
613
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
614
            "Attributes"
615
        ]
616

617
    yield _factory
1✔
618

619
    for arn in subscriptions:
1✔
620
        try:
1✔
621
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
622
        except Exception as e:
×
623
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
624

625

626
@pytest.fixture
1✔
627
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
628
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
629
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
630
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
631
    http_servers = []
1✔
632

633
    def _create_http_endpoint(
1✔
634
        raw_message_delivery: bool = False,
635
    ) -> tuple[str, str, str, HTTPServer]:
636
        server = HTTPServer()
1✔
637
        server.start()
1✔
638
        http_servers.append(server)
1✔
639
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
640
        endpoint_url = server.url_for("/sns-endpoint")
1✔
641
        wait_for_port_open(endpoint_url)
1✔
642

643
        topic_arn = sns_create_topic()["TopicArn"]
1✔
644
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
645
        subscription_arn = subscription["SubscriptionArn"]
1✔
646
        delivery_policy = {
1✔
647
            "healthyRetryPolicy": {
648
                "minDelayTarget": 1,
649
                "maxDelayTarget": 1,
650
                "numRetries": 0,
651
                "numNoDelayRetries": 0,
652
                "numMinDelayRetries": 0,
653
                "numMaxDelayRetries": 0,
654
                "backoffFunction": "linear",
655
            },
656
            "sicklyRetryPolicy": None,
657
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
658
            "guaranteed": False,
659
        }
660
        aws_client.sns.set_subscription_attributes(
1✔
661
            SubscriptionArn=subscription_arn,
662
            AttributeName="DeliveryPolicy",
663
            AttributeValue=json.dumps(delivery_policy),
664
        )
665

666
        if raw_message_delivery:
1✔
667
            aws_client.sns.set_subscription_attributes(
1✔
668
                SubscriptionArn=subscription_arn,
669
                AttributeName="RawMessageDelivery",
670
                AttributeValue="true",
671
            )
672

673
        return topic_arn, subscription_arn, endpoint_url, server
1✔
674

675
    yield _create_http_endpoint
1✔
676

677
    for http_server in http_servers:
1✔
678
        if http_server.is_running():
1✔
679
            http_server.stop()
1✔
680

681

682
@pytest.fixture
1✔
683
def route53_hosted_zone(aws_client):
1✔
684
    hosted_zones = []
1✔
685

686
    def factory(**kwargs):
1✔
687
        if "Name" not in kwargs:
1✔
688
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
689
        if "CallerReference" not in kwargs:
1✔
690
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
691
        response = aws_client.route53.create_hosted_zone(
1✔
692
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
693
        )
694
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
695
        return response
1✔
696

697
    yield factory
1✔
698

699
    for zone in hosted_zones:
1✔
700
        try:
1✔
701
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
702
        except Exception as e:
1✔
703
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
704

705

706
@pytest.fixture
1✔
707
def transcribe_create_job(s3_bucket, aws_client):
1✔
708
    job_names = []
1✔
709

710
    def _create_job(audio_file: str, params: dict[str, Any] | None = None) -> str:
1✔
711
        s3_key = "test-clip.wav"
1✔
712

713
        if not params:
1✔
714
            params = {}
1✔
715

716
        if "TranscriptionJobName" not in params:
1✔
717
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
718

719
        if "LanguageCode" not in params:
1✔
720
            params["LanguageCode"] = "en-GB"
1✔
721

722
        if "Media" not in params:
1✔
723
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
724

725
        # upload test wav to a s3 bucket
726
        with open(audio_file, "rb") as f:
1✔
727
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
728

729
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
730

731
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
732
        job_names.append(job_name)
1✔
733

734
        return job_name
1✔
735

736
    yield _create_job
1✔
737

738
    for job_name in job_names:
1✔
739
        with contextlib.suppress(ClientError):
1✔
740
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
741

742

743
@pytest.fixture
1✔
744
def kinesis_create_stream(aws_client):
1✔
745
    stream_names = []
1✔
746

747
    def _create_stream(**kwargs):
1✔
748
        if "StreamName" not in kwargs:
1✔
749
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
750
        aws_client.kinesis.create_stream(**kwargs)
1✔
751
        stream_names.append(kwargs["StreamName"])
1✔
752
        return kwargs["StreamName"]
1✔
753

754
    yield _create_stream
1✔
755

756
    for stream_name in stream_names:
1✔
757
        try:
1✔
758
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
759
        except Exception as e:
1✔
760
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
1✔
761

762

763
@pytest.fixture
1✔
764
def wait_for_stream_ready(aws_client):
1✔
765
    def _wait_for_stream_ready(stream_name: str):
1✔
766
        def is_stream_ready():
1✔
767
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
768
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
769
                "ACTIVE",
770
                "UPDATING",
771
            ]
772

773
        return poll_condition(is_stream_ready)
1✔
774

775
    return _wait_for_stream_ready
1✔
776

777

778
@pytest.fixture
1✔
779
def wait_for_delivery_stream_ready(aws_client):
1✔
780
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
781
        def is_stream_ready():
1✔
782
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
783
                DeliveryStreamName=delivery_stream_name
784
            )
785
            return (
1✔
786
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
787
                == "ACTIVE"
788
            )
789

790
        poll_condition(is_stream_ready)
1✔
791

792
    return _wait_for_stream_ready
1✔
793

794

795
@pytest.fixture
1✔
796
def wait_for_dynamodb_stream_ready(aws_client):
1✔
797
    def _wait_for_stream_ready(stream_arn: str, client=None):
1✔
798
        def is_stream_ready():
1✔
799
            ddb_client = client or aws_client.dynamodbstreams
1✔
800
            describe_stream_response = ddb_client.describe_stream(StreamArn=stream_arn)
1✔
801
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
802

803
        return poll_condition(is_stream_ready)
1✔
804

805
    return _wait_for_stream_ready
1✔
806

807

808
@pytest.fixture()
1✔
809
def kms_create_key(aws_client_factory):
1✔
810
    key_ids = []
1✔
811

812
    def _create_key(region_name: str = None, **kwargs):
1✔
813
        if "Description" not in kwargs:
1✔
814
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
815
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
816
            "KeyMetadata"
817
        ]
818
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
819
        return key_metadata
1✔
820

821
    yield _create_key
1✔
822

823
    for region_name, key_id in key_ids:
1✔
824
        try:
1✔
825
            # shortest amount of time you can schedule the deletion
826
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
827
                KeyId=key_id, PendingWindowInDays=7
828
            )
829
        except Exception as e:
1✔
830
            exception_message = str(e)
1✔
831
            # Some tests schedule their keys for deletion themselves.
832
            if (
1✔
833
                "KMSInvalidStateException" not in exception_message
834
                or "is pending deletion" not in exception_message
835
            ):
836
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
837

838

839
@pytest.fixture()
1✔
840
def kms_replicate_key(aws_client_factory):
1✔
841
    key_ids = []
1✔
842

843
    def _replicate_key(region_from=None, **kwargs):
1✔
844
        region_to = kwargs.get("ReplicaRegion")
1✔
845
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
846
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
847

848
    yield _replicate_key
1✔
849

850
    for region_to, key_id in key_ids:
1✔
851
        try:
1✔
852
            # shortest amount of time you can schedule the deletion
853
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
854
                KeyId=key_id, PendingWindowInDays=7
855
            )
856
        except Exception as e:
×
857
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
858

859

860
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
861
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
862
# to make sure that we clean up aliases before keys get cleaned up.
863
@pytest.fixture()
1✔
864
def kms_create_alias(kms_create_key, aws_client):
1✔
865
    aliases = []
1✔
866

867
    def _create_alias(**kwargs):
1✔
868
        if "AliasName" not in kwargs:
1✔
869
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
870
        if "TargetKeyId" not in kwargs:
1✔
871
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
872

873
        aws_client.kms.create_alias(**kwargs)
1✔
874
        aliases.append(kwargs["AliasName"])
1✔
875
        return kwargs["AliasName"]
1✔
876

877
    yield _create_alias
1✔
878

879
    for alias in aliases:
1✔
880
        try:
1✔
881
            aws_client.kms.delete_alias(AliasName=alias)
1✔
882
        except Exception as e:
1✔
883
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
884

885

886
@pytest.fixture()
1✔
887
def kms_create_grant(kms_create_key, aws_client):
1✔
888
    grants = []
1✔
889

890
    def _create_grant(**kwargs):
1✔
891
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
892
        # but some GranteePrincipal is required to create a grant.
893
        GRANTEE_PRINCIPAL_ARN = (
1✔
894
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
895
        )
896

897
        if "Operations" not in kwargs:
1✔
898
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
899
        if "GranteePrincipal" not in kwargs:
1✔
900
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
901
        if "KeyId" not in kwargs:
1✔
902
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
903

904
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
905
        grants.append((grant_id, kwargs["KeyId"]))
1✔
906
        return grant_id, kwargs["KeyId"]
1✔
907

908
    yield _create_grant
1✔
909

910
    for grant_id, key_id in grants:
1✔
911
        try:
1✔
912
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
913
        except Exception as e:
×
914
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
915

916

917
@pytest.fixture
1✔
918
def kms_key(kms_create_key):
1✔
919
    return kms_create_key()
1✔
920

921

922
@pytest.fixture
1✔
923
def kms_grant_and_key(kms_key, aws_client):
1✔
924
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
925

926
    return [
1✔
927
        aws_client.kms.create_grant(
928
            KeyId=kms_key["KeyId"],
929
            GranteePrincipal=user_arn,
930
            Operations=["Decrypt", "Encrypt"],
931
        ),
932
        kms_key,
933
    ]
934

935

936
@pytest.fixture
1✔
937
def opensearch_wait_for_cluster(aws_client):
1✔
938
    def _wait_for_cluster(domain_name: str):
1✔
939
        def finished_processing():
1✔
940
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
941
            return status["DomainProcessingStatus"] == "Active" and "Endpoint" in status
1✔
942

943
        assert poll_condition(
1✔
944
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
945
        ), f"could not start domain: {domain_name}"
946

947
    return _wait_for_cluster
1✔
948

949

950
@pytest.fixture
1✔
951
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
952
    domains = []
1✔
953

954
    def factory(**kwargs) -> str:
1✔
955
        if "DomainName" not in kwargs:
1✔
956
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
957

958
        aws_client.opensearch.create_domain(**kwargs)
1✔
959

960
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
961

962
        domains.append(kwargs["DomainName"])
1✔
963
        return kwargs["DomainName"]
1✔
964

965
    yield factory
1✔
966

967
    # cleanup
968
    for domain in domains:
1✔
969
        try:
1✔
970
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
971
        except Exception as e:
×
972
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
973

974

975
@pytest.fixture
1✔
976
def opensearch_domain(opensearch_create_domain) -> str:
1✔
977
    return opensearch_create_domain()
1✔
978

979

980
@pytest.fixture
1✔
981
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
982
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
983
    assert "Endpoint" in status
1✔
984
    return f"https://{status['Endpoint']}"
1✔
985

986

987
@pytest.fixture
1✔
988
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
989
    document = {
1✔
990
        "first_name": "Boba",
991
        "last_name": "Fett",
992
        "age": 41,
993
        "about": "I'm just a simple man, trying to make my way in the universe.",
994
        "interests": ["mandalorian armor", "tusken culture"],
995
    }
996
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
997
    response = requests.put(
1✔
998
        document_path,
999
        data=json.dumps(document),
1000
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
1001
    )
1002
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1003
    return document_path
1✔
1004

1005

1006
# Cleanup fixtures
1007
@pytest.fixture
1✔
1008
def cleanup_stacks(aws_client):
1✔
1009
    def _cleanup_stacks(stacks: list[str]) -> None:
1✔
1010
        stacks = ensure_list(stacks)
1✔
1011
        for stack in stacks:
1✔
1012
            try:
1✔
1013
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1014
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1015
            except Exception:
×
1016
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1017

1018
    return _cleanup_stacks
1✔
1019

1020

1021
@pytest.fixture
1✔
1022
def cleanup_changesets(aws_client):
1✔
1023
    def _cleanup_changesets(changesets: list[str]) -> None:
1✔
1024
        changesets = ensure_list(changesets)
1✔
1025
        for cs in changesets:
1✔
1026
            try:
1✔
1027
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1028
            except Exception:
1✔
1029
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1030

1031
    return _cleanup_changesets
1✔
1032

1033

1034
# Helpers for Cfn
1035

1036

1037
# TODO: exports(!)
1038
@dataclasses.dataclass(frozen=True)
1✔
1039
class DeployResult:
1✔
1040
    change_set_id: str
1✔
1041
    stack_id: str
1✔
1042
    stack_name: str
1✔
1043
    change_set_name: str
1✔
1044
    outputs: dict[str, str]
1✔
1045

1046
    destroy: Callable[[], None]
1✔
1047

1048

1049
class StackDeployError(Exception):
1✔
1050
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1051
        self.describe_result = describe_res
1✔
1052
        self.events = events
1✔
1053

1054
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1055
        if config.CFN_VERBOSE_ERRORS:
1✔
1056
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1057
        else:
1058
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1059

1060
        super().__init__(msg)
1✔
1061

1062
    def format_events(self, events: list[dict]) -> str:
1✔
1063
        formatted_events = []
1✔
1064

1065
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1066
        for event in chronological_events:
1✔
1067
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1068
                formatted_events.append(self.format_event(event))
1✔
1069

1070
        return "\n".join(formatted_events)
1✔
1071

1072
    @staticmethod
1✔
1073
    def format_event(event: dict) -> str:
1✔
1074
        if reason := event.get("ResourceStatusReason"):
1✔
1075
            reason = reason.replace("\n", "; ")
1✔
1076
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1077
        else:
1078
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
1✔
1079

1080

1081
@pytest.fixture
1✔
1082
def deploy_cfn_template(
1✔
1083
    aws_client: ServiceLevelClientFactory,
1084
):
1085
    state: list[tuple[str, Callable]] = []
1✔
1086

1087
    def _deploy(
1✔
1088
        *,
1089
        is_update: bool | None = False,
1090
        stack_name: str | None = None,
1091
        change_set_name: str | None = None,
1092
        template: str | None = None,
1093
        template_path: str | os.PathLike | None = None,
1094
        template_mapping: dict[str, Any] | None = None,
1095
        parameters: dict[str, str] | None = None,
1096
        role_arn: str | None = None,
1097
        max_wait: int | None = None,
1098
        delay_between_polls: int | None = 2,
1099
        custom_aws_client: ServiceLevelClientFactory | None = None,
1100
        raw_parameters: list[Parameter] | None = None,
1101
    ) -> DeployResult:
1102
        if is_update:
1✔
1103
            assert stack_name
1✔
1104
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1105
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1106

1107
        if max_wait is None:
1✔
1108
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1109

1110
        if template_path is not None:
1✔
1111
            template = load_template_file(template_path)
1✔
1112
            if template is None:
1✔
1113
                raise RuntimeError(f"Could not find file {os.path.realpath(template_path)}")
×
1114
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1115

1116
        kwargs = CreateChangeSetInput(
1✔
1117
            StackName=stack_name,
1118
            ChangeSetName=change_set_name,
1119
            TemplateBody=template_rendered,
1120
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1121
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1122
        )
1123
        kwargs["Parameters"] = []
1✔
1124
        if parameters:
1✔
1125
            kwargs["Parameters"] = [
1✔
1126
                Parameter(ParameterKey=k, ParameterValue=v) for (k, v) in parameters.items()
1127
            ]
1128
        elif raw_parameters:
1✔
1129
            kwargs["Parameters"] = raw_parameters
×
1130

1131
        if role_arn is not None:
1✔
1132
            kwargs["RoleARN"] = role_arn
×
1133

1134
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1135

1136
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1137

1138
        change_set_id = response["Id"]
1✔
1139
        stack_id = response["StackId"]
1✔
1140

1141
        try:
1✔
1142
            cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1143
                ChangeSetName=change_set_id
1144
            )
1145
        except botocore.exceptions.WaiterError as e:
×
1146
            change_set = cfn_aws_client.cloudformation.describe_change_set(
×
1147
                ChangeSetName=change_set_id
1148
            )
1149
            raise Exception(f"{change_set['Status']}: {change_set.get('StatusReason')}") from e
×
1150

1151
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1152
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1153
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1154
        )
1155

1156
        try:
1✔
1157
            stack_waiter.wait(
1✔
1158
                StackName=stack_id,
1159
                WaiterConfig={
1160
                    "Delay": delay_between_polls,
1161
                    "MaxAttempts": max_wait / delay_between_polls,
1162
                },
1163
            )
1164
        except botocore.exceptions.WaiterError as e:
1✔
1165
            raise StackDeployError(
1✔
1166
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1167
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1168
                    "StackEvents"
1169
                ],
1170
            ) from e
1171

1172
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1173
            "Stacks"
1174
        ][0]
1175
        outputs = describe_stack_res.get("Outputs", [])
1✔
1176

1177
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1178

1179
        def _destroy_stack():
1✔
1180
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1181
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1182
                StackName=stack_id,
1183
                WaiterConfig={
1184
                    "Delay": delay_between_polls,
1185
                    "MaxAttempts": max_wait / delay_between_polls,
1186
                },
1187
            )
1188

1189
        state.append((stack_id, _destroy_stack))
1✔
1190

1191
        return DeployResult(
1✔
1192
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1193
        )
1194

1195
    yield _deploy
1✔
1196

1197
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1198
    for stack_id, teardown in state[::-1]:
1✔
1199
        try:
1✔
1200
            teardown()
1✔
1201
        except Exception as e:
1✔
1202
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1203

1204

1205
@pytest.fixture
1✔
1206
def is_change_set_created_and_available(aws_client):
1✔
1207
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1208
        def _inner():
1✔
1209
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1210
            return (
1✔
1211
                # TODO: CREATE_FAILED should also not lead to further retries
1212
                change_set.get("Status") == "CREATE_COMPLETE"
1213
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1214
            )
1215

1216
        return _inner
1✔
1217

1218
    return _is_change_set_created_and_available
1✔
1219

1220

1221
@pytest.fixture
1✔
1222
def is_change_set_failed_and_unavailable(aws_client):
1✔
1223
    def _is_change_set_created_and_available(change_set_id: str):
×
1224
        def _inner():
×
1225
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1226
            return (
×
1227
                # TODO: CREATE_FAILED should also not lead to further retries
1228
                change_set.get("Status") == "FAILED"
1229
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1230
            )
1231

1232
        return _inner
×
1233

1234
    return _is_change_set_created_and_available
×
1235

1236

1237
@pytest.fixture
1✔
1238
def is_stack_created(aws_client):
1✔
1239
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1240

1241

1242
@pytest.fixture
1✔
1243
def is_stack_updated(aws_client):
1✔
1244
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1245

1246

1247
@pytest.fixture
1✔
1248
def is_stack_deleted(aws_client):
1✔
1249
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1250

1251

1252
def _has_stack_status(cfn_client, statuses: list[str]):
1✔
1253
    def _has_status(stack_id: str):
1✔
1254
        def _inner():
1✔
1255
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1256
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1257
            return s.get("StackStatus") in statuses
1✔
1258

1259
        return _inner
1✔
1260

1261
    return _has_status
1✔
1262

1263

1264
@pytest.fixture
1✔
1265
def is_change_set_finished(aws_client):
1✔
1266
    def _is_change_set_finished(change_set_id: str, stack_name: str | None = None):
1✔
1267
        def _inner():
×
1268
            kwargs = {"ChangeSetName": change_set_id}
×
1269
            if stack_name:
×
1270
                kwargs["StackName"] = stack_name
×
1271

1272
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
×
1273

1274
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
×
1275
                LOG.warning("Change set failed")
×
1276
                raise ShortCircuitWaitException()
×
1277

1278
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
×
1279

1280
        return _inner
×
1281

1282
    return _is_change_set_finished
1✔
1283

1284

1285
@pytest.fixture
1✔
1286
def wait_until_lambda_ready(aws_client):
1✔
1287
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1288
        client = client or aws_client.lambda_
1✔
1289

1290
        def _is_not_pending():
1✔
1291
            kwargs = {}
1✔
1292
            if qualifier:
1✔
1293
                kwargs["Qualifier"] = qualifier
×
1294
            try:
1✔
1295
                result = (
1✔
1296
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1297
                    != "Pending"
1298
                )
1299
                LOG.debug("lambda state result: result=%s", result)
1✔
1300
                return result
1✔
1301
            except Exception as e:
×
1302
                LOG.error(e)
×
1303
                raise
×
1304

1305
        wait_until(_is_not_pending)
1✔
1306

1307
    return _wait_until_ready
1✔
1308

1309

1310
role_assume_policy = """
1✔
1311
{
1312
  "Version": "2012-10-17",
1313
  "Statement": [
1314
    {
1315
      "Effect": "Allow",
1316
      "Principal": {
1317
        "Service": "lambda.amazonaws.com"
1318
      },
1319
      "Action": "sts:AssumeRole"
1320
    }
1321
  ]
1322
}
1323
""".strip()
1324

1325
role_policy = """
1✔
1326
{
1327
    "Version": "2012-10-17",
1328
    "Statement": [
1329
        {
1330
            "Effect": "Allow",
1331
            "Action": [
1332
                "logs:CreateLogGroup",
1333
                "logs:CreateLogStream",
1334
                "logs:PutLogEvents"
1335
            ],
1336
            "Resource": [
1337
                "*"
1338
            ]
1339
        }
1340
    ]
1341
}
1342
""".strip()
1343

1344

1345
@pytest.fixture
1✔
1346
def create_lambda_function_aws(aws_client):
1✔
1347
    lambda_arns = []
1✔
1348

1349
    def _create_lambda_function(**kwargs):
1✔
1350
        def _create_function():
1✔
1351
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1352
            lambda_arns.append(resp["FunctionArn"])
1✔
1353

1354
            def _is_not_pending():
1✔
1355
                try:
1✔
1356
                    result = (
1✔
1357
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1358
                            "Configuration"
1359
                        ]["State"]
1360
                        != "Pending"
1361
                    )
1362
                    return result
1✔
1363
                except Exception as e:
×
1364
                    LOG.error(e)
×
1365
                    raise
×
1366

1367
            wait_until(_is_not_pending)
1✔
1368
            return resp
1✔
1369

1370
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1371
        # localstack should normally not require the retries and will just continue here
1372
        return retry(_create_function, retries=3, sleep=4)
1✔
1373

1374
    yield _create_lambda_function
1✔
1375

1376
    for arn in lambda_arns:
1✔
1377
        try:
1✔
1378
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1379
        except Exception:
×
1380
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1381

1382

1383
@pytest.fixture
1✔
1384
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1385
    lambda_arns_and_clients = []
1✔
1386
    log_groups = []
1✔
1387
    lambda_client = aws_client.lambda_
1✔
1388
    logs_client = aws_client.logs
1✔
1389
    s3_client = aws_client.s3
1✔
1390

1391
    def _create_lambda_function(*args, **kwargs):
1✔
1392
        client = kwargs.get("client") or lambda_client
1✔
1393
        kwargs["client"] = client
1✔
1394
        kwargs["s3_client"] = s3_client
1✔
1395
        func_name = kwargs.get("func_name")
1✔
1396
        assert func_name
1✔
1397
        del kwargs["func_name"]
1✔
1398

1399
        if not kwargs.get("role"):
1✔
1400
            kwargs["role"] = lambda_su_role
1✔
1401

1402
        def _create_function():
1✔
1403
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1404
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1405
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1406
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1407
            log_groups.append(log_group_name)
1✔
1408
            return resp
1✔
1409

1410
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1411
        # localstack should normally not require the retries and will just continue here
1412
        return retry(_create_function, retries=3, sleep=4)
1✔
1413

1414
    yield _create_lambda_function
1✔
1415

1416
    for arn, client in lambda_arns_and_clients:
1✔
1417
        try:
1✔
1418
            client.delete_function(FunctionName=arn)
1✔
1419
        except Exception:
1✔
1420
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1421

1422
    for log_group_name in log_groups:
1✔
1423
        try:
1✔
1424
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1425
        except Exception:
1✔
1426
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1427

1428

1429
@pytest.fixture
1✔
1430
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1431
    from localstack.aws.api.lambda_ import Runtime
1✔
1432

1433
    lambda_client = aws_client.lambda_
1✔
1434
    handler_code = textwrap.dedent(
1✔
1435
        """
1436
    import json
1437
    import os
1438

1439

1440
    def make_response(body: dict, status_code: int = 200):
1441
        return {
1442
            "statusCode": status_code,
1443
            "headers": {"Content-Type": "application/json"},
1444
            "body": body,
1445
        }
1446

1447

1448
    def trim_headers(headers):
1449
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1450
            return headers
1451
        return {
1452
            key: value for key, value in headers.items()
1453
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1454
        }
1455

1456

1457
    def handler(event, context):
1458
        print(json.dumps(event))
1459
        response = {
1460
            "args": event.get("queryStringParameters", {}),
1461
            "data": event.get("body", ""),
1462
            "domain": event["requestContext"].get("domainName", ""),
1463
            "headers": trim_headers(event.get("headers", {})),
1464
            "method": event["requestContext"]["http"].get("method", ""),
1465
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1466
            "path": event["requestContext"]["http"].get("path", ""),
1467
        }
1468
        return make_response(response)"""
1469
    )
1470

1471
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1472
        """Creates a server that will echo any request. Any request will be returned with the
1473
        following format. Any unset values will have those defaults.
1474
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1475
        order to create easier Snapshot testing. Default: `False`
1476
        {
1477
          "args": {},
1478
          "headers": {},
1479
          "data": "",
1480
          "method": "",
1481
          "domain": "",
1482
          "origin": "",
1483
          "path": ""
1484
        }"""
1485
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1486
        func_name = f"echo-http-{short_uid()}"
1✔
1487
        create_lambda_function(
1✔
1488
            func_name=func_name,
1489
            zip_file=zip_file,
1490
            runtime=Runtime.python3_12,
1491
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1492
        )
1493
        url_response = lambda_client.create_function_url_config(
1✔
1494
            FunctionName=func_name, AuthType="NONE"
1495
        )
1496
        aws_client.lambda_.add_permission(
1✔
1497
            FunctionName=func_name,
1498
            StatementId="urlPermission",
1499
            Action="lambda:InvokeFunctionUrl",
1500
            Principal="*",
1501
            FunctionUrlAuthType="NONE",
1502
        )
1503
        return url_response["FunctionUrl"]
1✔
1504

1505
    yield _create_echo_http_server
1✔
1506

1507

1508
@pytest.fixture
1✔
1509
def create_event_source_mapping(aws_client):
1✔
1510
    uuids = []
1✔
1511

1512
    def _create_event_source_mapping(*args, **kwargs):
1✔
1513
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1514
        uuids.append(response["UUID"])
1✔
1515
        return response
1✔
1516

1517
    yield _create_event_source_mapping
1✔
1518

1519
    for uuid in uuids:
1✔
1520
        try:
1✔
1521
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1522
        except Exception:
×
1523
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1524

1525

1526
@pytest.fixture
1✔
1527
def check_lambda_logs(aws_client):
1✔
1528
    def _check_logs(func_name: str, expected_lines: list[str] = None) -> list[str]:
1✔
1529
        if not expected_lines:
1✔
1530
            expected_lines = []
×
1531
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1532
        log_messages = [e["message"] for e in log_events]
1✔
1533
        for line in expected_lines:
1✔
1534
            if ".*" in line:
1✔
1535
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1536
                if any(found):
1✔
1537
                    continue
1✔
1538
            assert line in log_messages
×
1539
        return log_messages
1✔
1540

1541
    return _check_logs
1✔
1542

1543

1544
@pytest.fixture
1✔
1545
def create_policy(aws_client):
1✔
1546
    policy_arns = []
1✔
1547

1548
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1549
        iam_client = iam_client or aws_client.iam
1✔
1550
        if "PolicyName" not in kwargs:
1✔
1551
            kwargs["PolicyName"] = f"policy-{short_uid()}"
1✔
1552
        response = iam_client.create_policy(*args, **kwargs)
1✔
1553
        policy_arn = response["Policy"]["Arn"]
1✔
1554
        policy_arns.append((policy_arn, iam_client))
1✔
1555
        return response
1✔
1556

1557
    yield _create_policy
1✔
1558

1559
    for policy_arn, iam_client in policy_arns:
1✔
1560
        try:
1✔
1561
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1562
        except Exception:
1✔
1563
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1564

1565

1566
@pytest.fixture
1✔
1567
def create_user(aws_client):
1✔
1568
    usernames = []
1✔
1569

1570
    def _create_user(**kwargs):
1✔
1571
        if "UserName" not in kwargs:
1✔
1572
            kwargs["UserName"] = f"user-{short_uid()}"
×
1573
        response = aws_client.iam.create_user(**kwargs)
1✔
1574
        usernames.append(response["User"]["UserName"])
1✔
1575
        return response
1✔
1576

1577
    yield _create_user
1✔
1578

1579
    for username in usernames:
1✔
1580
        try:
1✔
1581
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1582
        except ClientError as e:
1✔
1583
            LOG.debug(
1✔
1584
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1585
            )
1586
            continue
1✔
1587

1588
        for inline_policy in inline_policies:
1✔
1589
            try:
1✔
1590
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1591
            except Exception:
×
1592
                LOG.debug(
×
1593
                    "Could not delete user policy '%s' from '%s' during cleanup",
1594
                    inline_policy,
1595
                    username,
1596
                )
1597
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1598
            "AttachedPolicies"
1599
        ]
1600
        for attached_policy in attached_policies:
1✔
1601
            try:
1✔
1602
                aws_client.iam.detach_user_policy(
1✔
1603
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1604
                )
1605
            except Exception:
1✔
1606
                LOG.debug(
1✔
1607
                    "Error detaching policy '%s' from user '%s'",
1608
                    attached_policy["PolicyArn"],
1609
                    username,
1610
                )
1611
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1612
        for access_key in access_keys:
1✔
1613
            try:
1✔
1614
                aws_client.iam.delete_access_key(
1✔
1615
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1616
                )
1617
            except Exception:
×
1618
                LOG.debug(
×
1619
                    "Error deleting access key '%s' from user '%s'",
1620
                    access_key["AccessKeyId"],
1621
                    username,
1622
                )
1623

1624
        try:
1✔
1625
            aws_client.iam.delete_user(UserName=username)
1✔
1626
        except Exception as e:
1✔
1627
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1628

1629

1630
@pytest.fixture
1✔
1631
def wait_and_assume_role(aws_client):
1✔
1632
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1633
        if not session_name:
1✔
1634
            session_name = f"session-{short_uid()}"
1✔
1635

1636
        def assume_role():
1✔
1637
            return aws_client.sts.assume_role(
1✔
1638
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1639
            )["Credentials"]
1640

1641
        # need to retry a couple of times before we are allowed to assume this role in AWS
1642
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1643
        return keys
1✔
1644

1645
    return _wait_and_assume_role
1✔
1646

1647

1648
@pytest.fixture
1✔
1649
def create_role(aws_client):
1✔
1650
    role_names = []
1✔
1651

1652
    def _create_role(iam_client=None, **kwargs):
1✔
1653
        if not kwargs.get("RoleName"):
1✔
1654
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1655
        iam_client = iam_client or aws_client.iam
1✔
1656
        result = iam_client.create_role(**kwargs)
1✔
1657
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1658
        return result
1✔
1659

1660
    yield _create_role
1✔
1661

1662
    for role_name, iam_client in role_names:
1✔
1663
        # detach policies
1664
        try:
1✔
1665
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1666
                "AttachedPolicies"
1667
            ]
1668
        except ClientError as e:
1✔
1669
            LOG.debug(
1✔
1670
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1671
                e,
1672
                role_name,
1673
            )
1674
            continue
1✔
1675
        for attached_policy in attached_policies:
1✔
1676
            try:
1✔
1677
                iam_client.detach_role_policy(
1✔
1678
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1679
                )
1680
            except Exception:
×
1681
                LOG.debug(
×
1682
                    "Could not detach role policy '%s' from '%s' during cleanup",
1683
                    attached_policy["PolicyArn"],
1684
                    role_name,
1685
                )
1686
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1687
        for role_policy in role_policies:
1✔
1688
            try:
1✔
1689
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1690
            except Exception:
×
1691
                LOG.debug(
×
1692
                    "Could not delete role policy '%s' from '%s' during cleanup",
1693
                    role_policy,
1694
                    role_name,
1695
                )
1696
        try:
1✔
1697
            iam_client.delete_role(RoleName=role_name)
1✔
1698
        except Exception:
×
1699
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1700

1701

1702
@pytest.fixture
1✔
1703
def create_parameter(aws_client):
1✔
1704
    params = []
1✔
1705

1706
    def _create_parameter(**kwargs):
1✔
1707
        params.append(kwargs["Name"])
1✔
1708
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1709

1710
    yield _create_parameter
1✔
1711

1712
    for param in params:
1✔
1713
        aws_client.ssm.delete_parameter(Name=param)
1✔
1714

1715

1716
@pytest.fixture
1✔
1717
def create_secret(aws_client):
1✔
1718
    items = []
1✔
1719

1720
    def _create_parameter(**kwargs):
1✔
1721
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1722
        items.append(create_response["ARN"])
1✔
1723
        return create_response
1✔
1724

1725
    yield _create_parameter
1✔
1726

1727
    for item in items:
1✔
1728
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1729

1730

1731
# TODO Figure out how to make cert creation tests pass against AWS.
1732
#
1733
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1734
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1735
#
1736
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1737
# by adding some CNAME records as requested by ASW in response to a certificate request.
1738
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1739
#
1740
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1741
# created by openssl etc. Not sure if there are other issues after that.
1742
#
1743
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1744
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1745
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1746
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1747
# pre-created certificates we would have to use specific domain names instead of random ones.
1748
@pytest.fixture
1✔
1749
def acm_request_certificate(aws_client_factory):
1✔
1750
    certificate_arns = []
1✔
1751

1752
    def factory(**kwargs) -> str:
1✔
1753
        if "DomainName" not in kwargs:
1✔
1754
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1755

1756
        region_name = kwargs.pop("region_name", None)
1✔
1757
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1758

1759
        response = acm_client.request_certificate(**kwargs)
1✔
1760
        created_certificate_arn = response["CertificateArn"]
1✔
1761
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1762
        return response
1✔
1763

1764
    yield factory
1✔
1765

1766
    # cleanup
1767
    for certificate_arn, region_name in certificate_arns:
1✔
1768
        try:
1✔
1769
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1770
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1771
        except Exception as e:
×
1772
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1773

1774

1775
role_policy_su = {
1✔
1776
    "Version": "2012-10-17",
1777
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1778
}
1779

1780

1781
@pytest.fixture(scope="session")
1✔
1782
def lambda_su_role(aws_client):
1✔
1783
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1784
    role = aws_client.iam.create_role(
1✔
1785
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1786
    )["Role"]
1787
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1788
    policy_arn = aws_client.iam.create_policy(
1✔
1789
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1790
    )["Policy"]["Arn"]
1791
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1792

1793
    if is_aws_cloud():  # dirty but necessary
1✔
1794
        time.sleep(10)
×
1795

1796
    yield role["Arn"]
1✔
1797

1798
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1799
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1800
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1801

1802

1803
@pytest.fixture
1✔
1804
def create_iam_role_and_attach_policy(aws_client):
1✔
1805
    """
1806
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1807

1808
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1809
    """
1810
    roles = []
×
1811

1812
    def _inner(**kwargs: dict[str, any]) -> str:
×
1813
        """
1814
        :param dict RoleDefinition: role definition document
1815
        :param str PolicyArn: policy ARN
1816
        :param str RoleName: role name (autogenerated if omitted)
1817
        :return: role ARN
1818
        """
1819
        if "RoleName" not in kwargs:
×
1820
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1821

1822
        role = kwargs["RoleName"]
×
1823
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1824

1825
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1826
        role_arn = result["Role"]["Arn"]
×
1827

1828
        policy_arn = kwargs["PolicyArn"]
×
1829
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1830

1831
        roles.append(role)
×
1832
        return role_arn
×
1833

1834
    yield _inner
×
1835

1836
    for role in roles:
×
1837
        try:
×
1838
            aws_client.iam.delete_role(RoleName=role)
×
1839
        except Exception as exc:
×
1840
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1841

1842

1843
@pytest.fixture
1✔
1844
def create_iam_role_with_policy(aws_client):
1✔
1845
    """
1846
    Fixture that creates an IAM role with given role definition and policy definition.
1847
    """
1848
    roles = {}
1✔
1849

1850
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1851
        """
1852
        :param dict RoleDefinition: role definition document
1853
        :param dict PolicyDefinition: policy definition document
1854
        :param str PolicyName: policy name (autogenerated if omitted)
1855
        :param str RoleName: role name (autogenerated if omitted)
1856
        :return: role ARN
1857
        """
1858
        if "RoleName" not in kwargs:
1✔
1859
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1860
        role = kwargs["RoleName"]
1✔
1861
        if "PolicyName" not in kwargs:
1✔
1862
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1863
        policy = kwargs["PolicyName"]
1✔
1864
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1865

1866
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1867
        role_arn = result["Role"]["Arn"]
1✔
1868

1869
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1870
        aws_client.iam.put_role_policy(
1✔
1871
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1872
        )
1873
        roles[role] = policy
1✔
1874
        return role_arn
1✔
1875

1876
    yield _create_role_and_policy
1✔
1877

1878
    for role_name, policy_name in roles.items():
1✔
1879
        try:
1✔
1880
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1881
        except Exception as exc:
×
1882
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1883
        try:
1✔
1884
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1885
        except Exception as exc:
×
1886
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1887

1888

1889
@pytest.fixture
1✔
1890
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1891
    delivery_stream_names = []
1✔
1892

1893
    def _create_delivery_stream(**kwargs):
1✔
1894
        if "DeliveryStreamName" not in kwargs:
1✔
1895
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1896
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1897
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1898
        delivery_stream_names.append(delivery_stream_name)
1✔
1899
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1900
        return response
1✔
1901

1902
    yield _create_delivery_stream
1✔
1903

1904
    for delivery_stream_name in delivery_stream_names:
1✔
1905
        try:
1✔
1906
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1907
        except Exception:
×
1908
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1909

1910

1911
@pytest.fixture
1✔
1912
def ses_configuration_set(aws_client):
1✔
1913
    configuration_set_names = []
1✔
1914

1915
    def factory(name: str) -> None:
1✔
1916
        aws_client.ses.create_configuration_set(
1✔
1917
            ConfigurationSet={
1918
                "Name": name,
1919
            },
1920
        )
1921
        configuration_set_names.append(name)
1✔
1922

1923
    yield factory
1✔
1924

1925
    for configuration_set_name in configuration_set_names:
1✔
1926
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1927

1928

1929
@pytest.fixture
1✔
1930
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1931
    event_destinations = []
1✔
1932

1933
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1934
        aws_client.ses.create_configuration_set_event_destination(
1✔
1935
            ConfigurationSetName=config_set_name,
1936
            EventDestination={
1937
                "Name": event_destination_name,
1938
                "Enabled": True,
1939
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1940
                "SNSDestination": {
1941
                    "TopicARN": topic_arn,
1942
                },
1943
            },
1944
        )
1945
        event_destinations.append((config_set_name, event_destination_name))
1✔
1946

1947
    yield factory
1✔
1948

1949
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1950
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1951
            ConfigurationSetName=created_config_set_name,
1952
            EventDestinationName=created_event_destination_name,
1953
        )
1954

1955

1956
@pytest.fixture
1✔
1957
def ses_email_template(aws_client):
1✔
1958
    template_names = []
1✔
1959

1960
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1961
        aws_client.ses.create_template(
1✔
1962
            Template={
1963
                "TemplateName": name,
1964
                "SubjectPart": subject,
1965
                "TextPart": contents,
1966
            }
1967
        )
1968
        template_names.append(name)
1✔
1969

1970
    yield factory
1✔
1971

1972
    for template_name in template_names:
1✔
1973
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1974

1975

1976
@pytest.fixture
1✔
1977
def ses_verify_identity(aws_client):
1✔
1978
    identities = []
1✔
1979

1980
    def factory(email_address: str) -> None:
1✔
1981
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1982

1983
    yield factory
1✔
1984

1985
    for identity in identities:
1✔
1986
        aws_client.ses.delete_identity(Identity=identity)
×
1987

1988

1989
@pytest.fixture
1✔
1990
def setup_sender_email_address(ses_verify_identity):
1✔
1991
    """
1992
    If the test is running against AWS then assume the email address passed is already
1993
    verified, and passes the given email address through. Otherwise, it generates one random
1994
    email address and verify them.
1995
    """
1996

1997
    def inner(sender_email_address: str | None = None) -> str:
1✔
1998
        if is_aws_cloud():
1✔
1999
            if sender_email_address is None:
×
2000
                raise ValueError(
×
2001
                    "sender_email_address must be specified to run this test against AWS"
2002
                )
2003
        else:
2004
            # overwrite the given parameters with localstack specific ones
2005
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
2006
            ses_verify_identity(sender_email_address)
1✔
2007

2008
        return sender_email_address
1✔
2009

2010
    return inner
1✔
2011

2012

2013
@pytest.fixture
1✔
2014
def ec2_create_security_group(aws_client):
1✔
2015
    ec2_sgs = []
1✔
2016

2017
    def factory(ports=None, ip_protocol: str = "tcp", **kwargs):
1✔
2018
        """
2019
        Create the target group and authorize the security group ingress.
2020
        :param ports: list of ports to be authorized for the ingress rule.
2021
        :param ip_protocol: the ip protocol for the permissions (tcp by default)
2022
        """
2023
        if "GroupName" not in kwargs:
1✔
2024
            # FIXME: This will fail against AWS since the sg prefix is not valid for GroupName
2025
            # > "Group names may not be in the format sg-*".
2026
            kwargs["GroupName"] = f"sg-{short_uid()}"
×
2027
        # Making sure the call to CreateSecurityGroup gets the right arguments
2028
        _args = select_from_typed_dict(CreateSecurityGroupRequest, kwargs)
1✔
2029
        security_group = aws_client.ec2.create_security_group(**_args)
1✔
2030
        security_group_id = security_group["GroupId"]
1✔
2031

2032
        # FIXME: If 'ports' is None or an empty list, authorize_security_group_ingress will fail due to missing IpPermissions.
2033
        # Must ensure ports are explicitly provided or skip authorization entirely if not required.
2034
        permissions = [
1✔
2035
            {
2036
                "FromPort": port,
2037
                "IpProtocol": ip_protocol,
2038
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2039
                "ToPort": port,
2040
            }
2041
            for port in ports or []
2042
        ]
2043
        aws_client.ec2.authorize_security_group_ingress(
1✔
2044
            GroupId=security_group_id,
2045
            IpPermissions=permissions,
2046
        )
2047

2048
        ec2_sgs.append(security_group_id)
1✔
2049
        return security_group
1✔
2050

2051
    yield factory
1✔
2052

2053
    for sg_group_id in ec2_sgs:
1✔
2054
        try:
1✔
2055
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2056
        except Exception as e:
×
2057
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2058

2059

2060
@pytest.fixture
1✔
2061
def ec2_create_vpc_endpoint(aws_client):
1✔
2062
    vpc_endpoints = []
1✔
2063

2064
    def _create(**kwargs: Unpack[CreateVpcEndpointRequest]) -> VpcEndpoint:
1✔
2065
        endpoint = aws_client.ec2.create_vpc_endpoint(**kwargs)
1✔
2066
        endpoint_id = endpoint["VpcEndpoint"]["VpcEndpointId"]
1✔
2067
        vpc_endpoints.append(endpoint_id)
1✔
2068

2069
        def _check_available() -> VpcEndpoint:
1✔
2070
            result = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=[endpoint_id])
1✔
2071
            _endpoint_details = result["VpcEndpoints"][0]
1✔
2072
            assert _endpoint_details["State"] == "available"
1✔
2073

2074
            return _endpoint_details
1✔
2075

2076
        return retry(_check_available, retries=30, sleep=5 if is_aws_cloud() else 1)
1✔
2077

2078
    yield _create
1✔
2079

2080
    try:
1✔
2081
        aws_client.ec2.delete_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
1✔
2082
    except Exception as e:
×
2083
        LOG.error("Error cleaning up VPC endpoint: %s, %s", vpc_endpoints, e)
×
2084

2085
    def wait_for_endpoint_deleted():
1✔
2086
        try:
×
2087
            endpoints = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
×
2088
            assert len(endpoints["VpcEndpoints"]) == 0 or all(
×
2089
                endpoint["State"] == "Deleted" for endpoint in endpoints["VpcEndpoints"]
2090
            )
2091
        except botocore.exceptions.ClientError:
×
2092
            pass
×
2093

2094
    # the vpc can't be deleted if an endpoint exists
2095
    if is_aws_cloud():
1✔
2096
        retry(wait_for_endpoint_deleted, retries=30, sleep=10 if is_aws_cloud() else 1)
×
2097

2098

2099
@pytest.fixture
1✔
2100
def cleanups():
1✔
2101
    cleanup_fns = []
1✔
2102

2103
    yield cleanup_fns
1✔
2104

2105
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2106
        try:
1✔
2107
            cleanup_callback()
1✔
2108
        except Exception as e:
1✔
2109
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2110

2111

2112
@pytest.fixture(scope="session")
1✔
2113
def account_id(aws_client):
1✔
2114
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2115
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2116
    else:
2117
        return TEST_AWS_ACCOUNT_ID
×
2118

2119

2120
@pytest.fixture(scope="session")
1✔
2121
def region_name(aws_client):
1✔
2122
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2123
        return aws_client.sts.meta.region_name
1✔
2124
    else:
2125
        return TEST_AWS_REGION_NAME
×
2126

2127

2128
@pytest.fixture(scope="session")
1✔
2129
def partition(region_name):
1✔
2130
    return get_partition(region_name)
1✔
2131

2132

2133
@pytest.fixture(scope="session")
1✔
2134
def secondary_account_id(secondary_aws_client):
1✔
2135
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2136
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2137
    else:
2138
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2139

2140

2141
@pytest.fixture(scope="session")
1✔
2142
def secondary_region_name():
1✔
2143
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2144

2145

2146
@pytest.hookimpl
1✔
2147
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2148
    only_localstack = pytest.mark.skipif(
1✔
2149
        is_aws_cloud(),
2150
        reason="test only applicable if run against localstack",
2151
    )
2152
    for item in items:
1✔
2153
        for mark in item.iter_markers():
1✔
2154
            if mark.name.endswith("only_localstack"):
1✔
2155
                item.add_marker(only_localstack)
1✔
2156
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2157
            # add a marker that indicates that this test is snapshot validated
2158
            # if it uses the snapshot fixture -> allows selecting only snapshot
2159
            # validated tests in order to capture new snapshots for a whole
2160
            # test file with "-m snapshot_validated"
2161
            item.add_marker("snapshot_validated")
1✔
2162

2163

2164
@pytest.fixture
1✔
2165
def sample_stores() -> AccountRegionBundle:
1✔
2166
    class SampleStore(BaseStore):
1✔
2167
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2168
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2169
        region_specific_attr = LocalAttribute(default=list)
1✔
2170

2171
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2172

2173

2174
@pytest.fixture
1✔
2175
def create_rest_apigw(aws_client_factory):
1✔
2176
    rest_apis = []
1✔
2177
    retry_boto_config = None
1✔
2178
    if is_aws_cloud():
1✔
2179
        retry_boto_config = botocore.config.Config(
×
2180
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2181
            retries={"max_attempts": 10, "mode": "adaptive"}
2182
        )
2183

2184
    def _create_apigateway_function(**kwargs):
1✔
2185
        client_region_name = kwargs.pop("region_name", None)
1✔
2186
        apigateway_client = aws_client_factory(
1✔
2187
            region_name=client_region_name, config=retry_boto_config
2188
        ).apigateway
2189
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2190

2191
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2192
        api_id = response.get("id")
1✔
2193
        rest_apis.append((api_id, client_region_name))
1✔
2194

2195
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2196

2197
    yield _create_apigateway_function
1✔
2198

2199
    for rest_api_id, _client_region_name in rest_apis:
1✔
2200
        apigateway_client = aws_client_factory(
1✔
2201
            region_name=_client_region_name,
2202
            config=retry_boto_config,
2203
        ).apigateway
2204
        # First, retrieve the usage plans associated with the REST API
2205
        usage_plan_ids = []
1✔
2206
        usage_plans = apigateway_client.get_usage_plans()
1✔
2207
        for item in usage_plans.get("items", []):
1✔
2208
            api_stages = item.get("apiStages", [])
1✔
2209
            usage_plan_ids.extend(
1✔
2210
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2211
            )
2212

2213
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2214
        with contextlib.suppress(Exception):
1✔
2215
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2216

2217
        # finally delete the usage plans and the API Keys linked to it
2218
        for usage_plan_id in usage_plan_ids:
1✔
2219
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2220
            for key in usage_plan_keys.get("items", []):
1✔
2221
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2222
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2223

2224

2225
@pytest.fixture
1✔
2226
def create_rest_apigw_openapi(aws_client_factory):
1✔
2227
    rest_apis = []
×
2228

2229
    def _create_apigateway_function(**kwargs):
×
2230
        region_name = kwargs.pop("region_name", None)
×
2231
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2232

2233
        response = apigateway_client.import_rest_api(**kwargs)
×
2234
        api_id = response.get("id")
×
2235
        rest_apis.append((api_id, region_name))
×
2236
        return api_id, response
×
2237

2238
    yield _create_apigateway_function
×
2239

2240
    for rest_api_id, region_name in rest_apis:
×
2241
        with contextlib.suppress(Exception):
×
2242
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2243
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2244

2245

2246
@pytest.fixture
1✔
2247
def assert_host_customisation(monkeypatch):
1✔
2248
    localstack_host = "foo.bar"
1✔
2249
    monkeypatch.setattr(
1✔
2250
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2251
    )
2252

2253
    def asserter(
1✔
2254
        url: str,
2255
        *,
2256
        custom_host: str | None = None,
2257
    ):
2258
        if custom_host is not None:
1✔
2259
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2260

2261
            assert localstack_host not in url
×
2262
        else:
2263
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2264

2265
    yield asserter
1✔
2266

2267

2268
@pytest.fixture
1✔
2269
def echo_http_server(httpserver: HTTPServer):
1✔
2270
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2271

2272
    def _echo(request: Request) -> Response:
1✔
2273
        request_json = None
1✔
2274
        if request.is_json:
1✔
2275
            with contextlib.suppress(ValueError):
1✔
2276
                request_json = json.loads(request.data)
1✔
2277
        result = {
1✔
2278
            "data": request.data or "{}",
2279
            "headers": dict(request.headers),
2280
            "url": request.url,
2281
            "method": request.method,
2282
            "json": request_json,
2283
        }
2284
        response_body = json.dumps(json_safe(result))
1✔
2285
        return Response(response_body, status=200)
1✔
2286

2287
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2288
    http_endpoint = httpserver.url_for("/")
1✔
2289

2290
    return http_endpoint
1✔
2291

2292

2293
@pytest.fixture
1✔
2294
def echo_http_server_post(echo_http_server):
1✔
2295
    """
2296
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2297
    """
2298
    if is_aws_cloud():
1✔
2299
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2300

2301
    return f"{echo_http_server}post"
1✔
2302

2303

2304
def create_policy_doc(effect: str, actions: list, resource=None) -> dict:
1✔
2305
    actions = ensure_list(actions)
1✔
2306
    resource = resource or "*"
1✔
2307
    return {
1✔
2308
        "Version": "2012-10-17",
2309
        "Statement": [
2310
            {
2311
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2312
                "Sid": f"s{short_uid()}",
2313
                "Effect": effect,
2314
                "Action": actions,
2315
                "Resource": resource,
2316
            }
2317
        ],
2318
    }
2319

2320

2321
@pytest.fixture
1✔
2322
def create_policy_generated_document(create_policy):
1✔
2323
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2324
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2325
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2326
        response = create_policy(
1✔
2327
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2328
        )
2329
        policy_arn = response["Policy"]["Arn"]
1✔
2330
        return policy_arn
1✔
2331

2332
    return _create_policy_with_doc
1✔
2333

2334

2335
@pytest.fixture
1✔
2336
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2337
    def _create_role_with_policy(
1✔
2338
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2339
    ):
2340
        iam_client = iam_client or aws_client.iam
1✔
2341

2342
        role_name = f"role-{short_uid()}"
1✔
2343
        result = create_role(
1✔
2344
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2345
        )
2346
        role_arn = result["Role"]["Arn"]
1✔
2347
        policy_name = f"p-{short_uid()}"
1✔
2348

2349
        if attach:
1✔
2350
            # create role and attach role policy
2351
            policy_arn = create_policy_generated_document(
1✔
2352
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2353
            )
2354
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2355
        else:
2356
            # put role policy
2357
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2358
            policy_document = json.dumps(policy_document)
1✔
2359
            iam_client.put_role_policy(
1✔
2360
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2361
            )
2362

2363
        return role_name, role_arn
1✔
2364

2365
    return _create_role_with_policy
1✔
2366

2367

2368
@pytest.fixture
1✔
2369
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2370
    def _create_user_with_policy(effect, actions, resource=None):
×
2371
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2372
        username = f"user-{short_uid()}"
×
2373
        create_user(UserName=username)
×
2374
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2375
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2376
        return username, keys
×
2377

2378
    return _create_user_with_policy
×
2379

2380

2381
@pytest.fixture()
1✔
2382
def register_extension(s3_bucket, aws_client):
1✔
2383
    cfn_client = aws_client.cloudformation
×
2384
    extensions_arns = []
×
2385

2386
    def _register(extension_name, extension_type, artifact_path):
×
2387
        bucket = s3_bucket
×
2388
        key = f"artifact-{short_uid()}"
×
2389

2390
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2391

2392
        register_response = cfn_client.register_type(
×
2393
            Type=extension_type,
2394
            TypeName=extension_name,
2395
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2396
        )
2397

2398
        registration_token = register_response["RegistrationToken"]
×
2399
        cfn_client.get_waiter("type_registration_complete").wait(
×
2400
            RegistrationToken=registration_token
2401
        )
2402

2403
        describe_response = cfn_client.describe_type_registration(
×
2404
            RegistrationToken=registration_token
2405
        )
2406

2407
        extensions_arns.append(describe_response["TypeArn"])
×
2408
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2409

2410
        return describe_response
×
2411

2412
    yield _register
×
2413

2414
    for arn in extensions_arns:
×
2415
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2416
        for v in versions:
×
2417
            try:
×
2418
                cfn_client.deregister_type(Arn=v["Arn"])
×
2419
            except Exception:
×
2420
                continue
×
2421
        cfn_client.deregister_type(Arn=arn)
×
2422

2423

2424
@pytest.fixture
1✔
2425
def hosted_zone(aws_client):
1✔
2426
    zone_ids = []
1✔
2427

2428
    def factory(**kwargs):
1✔
2429
        if "CallerReference" not in kwargs:
1✔
2430
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2431
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2432
        zone_id = response["HostedZone"]["Id"]
1✔
2433
        zone_ids.append(zone_id)
1✔
2434
        return response
1✔
2435

2436
    yield factory
1✔
2437

2438
    for zone_id in zone_ids[::-1]:
1✔
2439
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2440

2441

2442
@pytest.fixture
1✔
2443
def openapi_validate(monkeypatch):
1✔
2444
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2445
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2446

2447

2448
@pytest.fixture
1✔
2449
def set_resource_custom_id():
1✔
2450
    set_ids = []
1✔
2451

2452
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2453
        localstack_id_manager.set_custom_id(
1✔
2454
            resource_identifier=resource_identifier, custom_id=custom_id
2455
        )
2456
        set_ids.append(resource_identifier)
1✔
2457

2458
    yield _set_custom_id
1✔
2459

2460
    for resource_identifier in set_ids:
1✔
2461
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2462

2463

2464
###############################
2465
# Events (EventBridge) fixtures
2466
###############################
2467

2468

2469
@pytest.fixture
1✔
2470
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2471
    event_bus_names = []
1✔
2472

2473
    def _create_event_bus(**kwargs):
1✔
2474
        if "Name" not in kwargs:
1✔
2475
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2476

2477
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2478
        event_bus_names.append(kwargs["Name"])
1✔
2479
        return response
1✔
2480

2481
    yield _create_event_bus
1✔
2482

2483
    for event_bus_name in event_bus_names:
1✔
2484
        try:
1✔
2485
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2486
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2487

2488
            # Delete all rules for the current event bus
2489
            for rule in rules:
1✔
2490
                try:
1✔
2491
                    response = aws_client.events.list_targets_by_rule(
1✔
2492
                        Rule=rule, EventBusName=event_bus_name
2493
                    )
2494
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2495

2496
                    # Remove all targets for the current rule
2497
                    if targets:
1✔
2498
                        for target in targets:
1✔
2499
                            aws_client.events.remove_targets(
1✔
2500
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2501
                            )
2502

2503
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2504
                except Exception as e:
×
2505
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2506

2507
            # Delete archives for event bus
2508
            event_source_arn = (
1✔
2509
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2510
            )
2511
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2512
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2513
            for archive in archives:
1✔
2514
                try:
1✔
2515
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2516
                except Exception as e:
×
2517
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2518

2519
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2520
        except Exception as e:
1✔
2521
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2522

2523

2524
@pytest.fixture
1✔
2525
def events_put_rule(aws_client):
1✔
2526
    rules = []
1✔
2527

2528
    def _put_rule(**kwargs):
1✔
2529
        if "Name" not in kwargs:
1✔
2530
            kwargs["Name"] = f"rule-{short_uid()}"
×
2531

2532
        response = aws_client.events.put_rule(**kwargs)
1✔
2533
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2534
        return response
1✔
2535

2536
    yield _put_rule
1✔
2537

2538
    for rule, event_bus_name in rules:
1✔
2539
        try:
1✔
2540
            response = aws_client.events.list_targets_by_rule(
1✔
2541
                Rule=rule, EventBusName=event_bus_name
2542
            )
2543
            targets = [target["Id"] for target in response["Targets"]]
1✔
2544

2545
            # Remove all targets for the current rule
2546
            if targets:
1✔
2547
                for target in targets:
1✔
2548
                    aws_client.events.remove_targets(
1✔
2549
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2550
                    )
2551

2552
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2553
        except Exception as e:
1✔
2554
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2555

2556

2557
@pytest.fixture
1✔
2558
def events_create_rule(aws_client):
1✔
2559
    rules = []
1✔
2560

2561
    def _create_rule(**kwargs):
1✔
2562
        rule_name = kwargs["Name"]
1✔
2563
        bus_name = kwargs.get("EventBusName", "")
1✔
2564
        pattern = kwargs.get("EventPattern", {})
1✔
2565
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2566
        rule_arn = aws_client.events.put_rule(
1✔
2567
            Name=rule_name,
2568
            EventBusName=bus_name,
2569
            EventPattern=json.dumps(pattern),
2570
            ScheduleExpression=schedule,
2571
        )["RuleArn"]
2572
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2573
        return rule_arn
1✔
2574

2575
    yield _create_rule
1✔
2576

2577
    for rule in rules:
1✔
2578
        targets = aws_client.events.list_targets_by_rule(
1✔
2579
            Rule=rule["name"], EventBusName=rule["bus"]
2580
        )["Targets"]
2581

2582
        targetIds = [target["Id"] for target in targets]
1✔
2583
        if len(targetIds) > 0:
1✔
2584
            aws_client.events.remove_targets(
1✔
2585
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2586
            )
2587

2588
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2589

2590

2591
@pytest.fixture
1✔
2592
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2593
    """
2594
    Fixture that creates an SQS queue and sets it up as a target for EventBridge events.
2595
    """
2596
    queue_urls = []
1✔
2597

2598
    def _sqs_as_events_target(
1✔
2599
        queue_name: str | None = None, custom_aws_client=None
2600
    ) -> tuple[str, str]:
2601
        if not queue_name:
1✔
2602
            queue_name = f"tests-queue-{short_uid()}"
1✔
2603
        if custom_aws_client:
1✔
2604
            sqs_client = custom_aws_client.sqs
×
2605
        else:
2606
            sqs_client = aws_client.sqs
1✔
2607
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2608
        queue_urls.append((queue_url, sqs_client))
1✔
2609
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2610
        policy = {
1✔
2611
            "Version": "2012-10-17",
2612
            "Id": f"sqs-eventbridge-{short_uid()}",
2613
            "Statement": [
2614
                {
2615
                    "Sid": f"SendMessage-{short_uid()}",
2616
                    "Effect": "Allow",
2617
                    "Principal": {"Service": "events.amazonaws.com"},
2618
                    "Action": "sqs:SendMessage",
2619
                    "Resource": queue_arn,
2620
                }
2621
            ],
2622
        }
2623
        sqs_client.set_queue_attributes(
1✔
2624
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2625
        )
2626
        return queue_url, queue_arn, queue_name
1✔
2627

2628
    yield _sqs_as_events_target
1✔
2629

2630
    for queue_url, sqs_client in queue_urls:
1✔
2631
        try:
1✔
2632
            sqs_client.delete_queue(QueueUrl=queue_url)
1✔
2633
        except Exception as e:
×
2634
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2635

2636

2637
@pytest.fixture
1✔
2638
def create_role_event_bus_source_to_bus_target(create_iam_role_with_policy):
1✔
2639
    def _create_role_event_bus_to_bus():
1✔
2640
        assume_role_policy_document_bus_source_to_bus_target = {
1✔
2641
            "Version": "2012-10-17",
2642
            "Statement": [
2643
                {
2644
                    "Effect": "Allow",
2645
                    "Principal": {"Service": "events.amazonaws.com"},
2646
                    "Action": "sts:AssumeRole",
2647
                }
2648
            ],
2649
        }
2650

2651
        policy_document_bus_source_to_bus_target = {
1✔
2652
            "Version": "2012-10-17",
2653
            "Statement": [
2654
                {
2655
                    "Sid": "",
2656
                    "Effect": "Allow",
2657
                    "Action": "events:PutEvents",
2658
                    "Resource": "arn:aws:events:*:*:event-bus/*",
2659
                }
2660
            ],
2661
        }
2662

2663
        role_arn_bus_source_to_bus_target = create_iam_role_with_policy(
1✔
2664
            RoleDefinition=assume_role_policy_document_bus_source_to_bus_target,
2665
            PolicyDefinition=policy_document_bus_source_to_bus_target,
2666
        )
2667

2668
        return role_arn_bus_source_to_bus_target
1✔
2669

2670
    yield _create_role_event_bus_to_bus
1✔
2671

2672

2673
@pytest.fixture
1✔
2674
def get_primary_secondary_client(
1✔
2675
    aws_client_factory,
2676
    secondary_aws_client_factory,
2677
    region_name,
2678
    secondary_region_name,
2679
    account_id,
2680
    secondary_account_id,
2681
):
2682
    def _get_primary_secondary_clients(cross_scenario: str):
1✔
2683
        """
2684
        Returns primary and secondary AWS clients based on the cross-scenario.
2685
        :param cross_scenario: The scenario for cross-region or cross-account testing.
2686
                               Options: "region", "account", "region_account"
2687
                               account_region cross scenario is not supported by AWS
2688
        :return: A dictionary containing primary and secondary AWS clients, and their respective region and account IDs.
2689
        """
2690
        secondary_region = secondary_region_name
1✔
2691
        secondary_account = secondary_account_id
1✔
2692
        if cross_scenario not in ["region", "account", "region_account"]:
1✔
2693
            raise ValueError(f"cross_scenario {cross_scenario} not supported")
×
2694

2695
        primary_client = aws_client_factory(region_name=region_name)
1✔
2696

2697
        if cross_scenario == "region":
1✔
2698
            secondary_account = account_id
1✔
2699
            secondary_client = aws_client_factory(region_name=secondary_region_name)
1✔
2700

2701
        elif cross_scenario == "account":
1✔
2702
            secondary_region = region_name
1✔
2703
            secondary_client = secondary_aws_client_factory(region_name=region_name)
1✔
2704

2705
        elif cross_scenario == "region_account":
1✔
2706
            secondary_client = secondary_aws_client_factory(region_name=secondary_region)
1✔
2707

2708
        return {
1✔
2709
            "primary_aws_client": primary_client,
2710
            "secondary_aws_client": secondary_client,
2711
            "secondary_region_name": secondary_region,
2712
            "secondary_account_id": secondary_account,
2713
        }
2714

2715
    return _get_primary_secondary_clients
1✔
2716

2717

2718
@pytest.fixture
1✔
2719
def clean_up(
1✔
2720
    aws_client,
2721
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2722
    def _clean_up(
1✔
2723
        bus_name=None,
2724
        rule_name=None,
2725
        target_ids=None,
2726
        queue_url=None,
2727
        log_group_name=None,
2728
    ):
2729
        events_client = aws_client.events
1✔
2730
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2731
        if target_ids:
1✔
2732
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2733
            call_safe(
1✔
2734
                events_client.remove_targets,
2735
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2736
            )
2737
        if rule_name:
1✔
2738
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2739
        if bus_name:
1✔
2740
            call_safe(events_client.delete_event_bus, kwargs={"Name": bus_name})
×
2741
        if queue_url:
1✔
2742
            sqs_client = aws_client.sqs
×
2743
            call_safe(sqs_client.delete_queue, kwargs={"QueueUrl": queue_url})
×
2744
        if log_group_name:
1✔
2745
            logs_client = aws_client.logs
×
2746

2747
            def _delete_log_group():
×
2748
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2749
                for log_stream in log_streams["logStreams"]:
×
2750
                    logs_client.delete_log_stream(
×
2751
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2752
                    )
2753
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2754

2755
            call_safe(_delete_log_group)
×
2756

2757
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc