• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 4ecb9222-1031-4b49-9473-440da7a54636

31 Mar 2025 04:19PM UTC coverage: 86.891% (-0.02%) from 86.913%
4ecb9222-1031-4b49-9473-440da7a54636

push

circleci

web-flow
CloudFormation: POC Support for Modeling of Outputs Blocks in the Update Graph, Improved Handling of Intrinsic Function Types (#12443)

159 of 177 new or added lines in 3 files covered. (89.83%)

84 existing lines in 17 files now uncovered.

63469 of 73044 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.47
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
1✔
10

11
import botocore.auth
1✔
12
import botocore.config
1✔
13
import botocore.credentials
1✔
14
import botocore.session
1✔
15
import pytest
1✔
16
from _pytest.config import Config
1✔
17
from _pytest.nodes import Item
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.regions import EndpointResolver
1✔
20
from pytest_httpserver import HTTPServer
1✔
21
from werkzeug import Request, Response
1✔
22

23
from localstack import config
1✔
24
from localstack.aws.connect import ServiceLevelClientFactory
1✔
25
from localstack.services.stores import (
1✔
26
    AccountRegionBundle,
27
    BaseStore,
28
    CrossAccountAttribute,
29
    CrossRegionAttribute,
30
    LocalAttribute,
31
)
32
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
33
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
34
from localstack.testing.config import (
1✔
35
    SECONDARY_TEST_AWS_ACCOUNT_ID,
36
    SECONDARY_TEST_AWS_REGION_NAME,
37
    TEST_AWS_ACCOUNT_ID,
38
    TEST_AWS_REGION_NAME,
39
)
40
from localstack.utils import testutil
1✔
41
from localstack.utils.aws.arns import get_partition
1✔
42
from localstack.utils.aws.client import SigningHttpClient
1✔
43
from localstack.utils.aws.resources import create_dynamodb_table
1✔
44
from localstack.utils.bootstrap import is_api_enabled
1✔
45
from localstack.utils.collections import ensure_list
1✔
46
from localstack.utils.functions import call_safe, run_safe
1✔
47
from localstack.utils.http import safe_requests as requests
1✔
48
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
49
from localstack.utils.json import CustomEncoder, json_safe
1✔
50
from localstack.utils.net import wait_for_port_open
1✔
51
from localstack.utils.strings import short_uid, to_str
1✔
52
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
53

54
LOG = logging.getLogger(__name__)
1✔
55

56
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
57
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
58

59
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
60
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
61
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
62
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
63

64

65
if TYPE_CHECKING:
1✔
66
    from mypy_boto3_sqs import SQSClient
×
67
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
68

69

70
@pytest.fixture(scope="session")
1✔
71
def aws_client_no_retry(aws_client_factory):
1✔
72
    """
73
    This fixture can be used to obtain Boto clients with disabled retries for testing.
74
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
75

76
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
77
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
78

79
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
80
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
81
    General socket/connection errors:
82
    * ConnectionError
83
    * ConnectionClosedError
84
    * ReadTimeoutError
85
    * EndpointConnectionError
86

87
    Service-side throttling/limit errors and exceptions:
88
    * Throttling
89
    * ThrottlingException
90
    * ThrottledException
91
    * RequestThrottledException
92
    * ProvisionedThroughputExceededException
93

94
    HTTP status codes: 429, 500, 502, 503, 504, and 509
95

96
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
97
    """
98
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
99
    return aws_client_factory(config=no_retry_config)
1✔
100

101

102
@pytest.fixture(scope="class")
1✔
103
def aws_http_client_factory(aws_session):
1✔
104
    """
105
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
106
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
107
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
108
    LocalStack.
109

110
    Example invocations
111

112
        client = aws_signing_http_client_factory("sqs")
113
        client.get("http://localhost:4566/000000000000/my-queue")
114

115
    or
116
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
117
        client.post("...")
118
    """
119

120
    def factory(
1✔
121
        service: str,
122
        region: str = None,
123
        signer_factory: Callable[
124
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
125
        ] = botocore.auth.SigV4QueryAuth,
126
        endpoint_url: str = None,
127
        aws_access_key_id: str = None,
128
        aws_secret_access_key: str = None,
129
    ):
130
        region = region or TEST_AWS_REGION_NAME
1✔
131

132
        if aws_access_key_id or aws_secret_access_key:
1✔
133
            credentials = botocore.credentials.Credentials(
1✔
134
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
135
            )
136
        else:
137
            credentials = aws_session.get_credentials()
1✔
138

139
        creds = credentials.get_frozen_credentials()
1✔
140

141
        if not endpoint_url:
1✔
142
            if is_aws_cloud():
1✔
143
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
144
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
145
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
146
            else:
147
                endpoint_url = config.internal_service_url()
1✔
148

149
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
150

151
    return factory
1✔
152

153

154
@pytest.fixture(scope="class")
1✔
155
def s3_vhost_client(aws_client_factory, region_name):
1✔
156
    return aws_client_factory(
1✔
157
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
158
    ).s3
159

160

161
@pytest.fixture
1✔
162
def dynamodb_wait_for_table_active(aws_client):
1✔
163
    def wait_for_table_active(table_name: str, client=None):
1✔
164
        def wait():
1✔
165
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
166
                "TableStatus"
167
            ] == "ACTIVE"
168

169
        poll_condition(wait, timeout=30)
1✔
170

171
    return wait_for_table_active
1✔
172

173

174
@pytest.fixture
1✔
175
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
176
    tables = []
1✔
177

178
    def factory(**kwargs):
1✔
179
        if "TableName" not in kwargs:
1✔
180
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
181

182
        tables.append(kwargs["TableName"])
1✔
183
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
184
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
185
        return response
1✔
186

187
    yield factory
1✔
188

189
    # cleanup
190
    for table in tables:
1✔
191
        try:
1✔
192
            # table has to be in ACTIVE state before deletion
193
            dynamodb_wait_for_table_active(table)
1✔
194
            aws_client.dynamodb.delete_table(TableName=table)
1✔
195
        except Exception as e:
1✔
196
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
197

198

199
@pytest.fixture
1✔
200
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
201
    # beware, this swallows exception in create_dynamodb_table utility function
202
    tables = []
1✔
203

204
    def factory(**kwargs):
1✔
205
        kwargs["client"] = aws_client.dynamodb
1✔
206
        if "table_name" not in kwargs:
1✔
207
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
208
        if "partition_key" not in kwargs:
1✔
209
            kwargs["partition_key"] = "id"
1✔
210

211
        tables.append(kwargs["table_name"])
1✔
212

213
        return create_dynamodb_table(**kwargs)
1✔
214

215
    yield factory
1✔
216

217
    # cleanup
218
    for table in tables:
1✔
219
        try:
1✔
220
            # table has to be in ACTIVE state before deletion
221
            dynamodb_wait_for_table_active(table)
1✔
222
            aws_client.dynamodb.delete_table(TableName=table)
1✔
223
        except Exception as e:
1✔
224
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
225

226

227
@pytest.fixture
1✔
228
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
229
    buckets = []
1✔
230

231
    def factory(**kwargs) -> str:
1✔
232
        if "Bucket" not in kwargs:
1✔
233
            kwargs["Bucket"] = "test-bucket-%s" % short_uid()
1✔
234

235
        if (
1✔
236
            "CreateBucketConfiguration" not in kwargs
237
            and aws_client.s3.meta.region_name != "us-east-1"
238
        ):
UNCOV
239
            kwargs["CreateBucketConfiguration"] = {
×
240
                "LocationConstraint": aws_client.s3.meta.region_name
241
            }
242

243
        aws_client.s3.create_bucket(**kwargs)
1✔
244
        buckets.append(kwargs["Bucket"])
1✔
245
        return kwargs["Bucket"]
1✔
246

247
    yield factory
1✔
248

249
    # cleanup
250
    for bucket in buckets:
1✔
251
        try:
1✔
252
            s3_empty_bucket(bucket)
1✔
253
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
254
        except Exception as e:
1✔
255
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
256

257

258
@pytest.fixture
1✔
259
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
260
    buckets = []
1✔
261

262
    def factory(s3_client, **kwargs) -> str:
1✔
263
        if "Bucket" not in kwargs:
1✔
264
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
265

266
        response = s3_client.create_bucket(**kwargs)
1✔
267
        buckets.append(kwargs["Bucket"])
1✔
268
        return response
1✔
269

270
    yield factory
1✔
271

272
    # cleanup
273
    for bucket in buckets:
1✔
274
        try:
1✔
275
            s3_empty_bucket(bucket)
1✔
276
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
277
        except Exception as e:
×
278
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
279

280

281
@pytest.fixture
1✔
282
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
283
    region = aws_client.s3.meta.region_name
1✔
284
    kwargs = {}
1✔
285
    if region != "us-east-1":
1✔
UNCOV
286
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
287
    return s3_create_bucket(**kwargs)
1✔
288

289

290
@pytest.fixture
1✔
291
def s3_empty_bucket(aws_client):
1✔
292
    """
293
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
294
    """
295

296
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
297
    # FIXME: this won't work when bucket has more than 1000 objects
298
    def factory(bucket_name: str):
1✔
299
        kwargs = {}
1✔
300
        try:
1✔
301
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
302
            kwargs["BypassGovernanceRetention"] = True
1✔
303
        except ClientError:
1✔
304
            pass
1✔
305

306
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
307
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
308
        if objects:
1✔
309
            aws_client.s3.delete_objects(
1✔
310
                Bucket=bucket_name,
311
                Delete={"Objects": objects},
312
                **kwargs,
313
            )
314

315
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
316
        versions = response.get("Versions", [])
1✔
317
        versions.extend(response.get("DeleteMarkers", []))
1✔
318

319
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
320
        if object_versions:
1✔
321
            aws_client.s3.delete_objects(
1✔
322
                Bucket=bucket_name,
323
                Delete={"Objects": object_versions},
324
                **kwargs,
325
            )
326

327
    yield factory
1✔
328

329

330
@pytest.fixture
1✔
331
def sqs_create_queue(aws_client):
1✔
332
    queue_urls = []
1✔
333

334
    def factory(**kwargs):
1✔
335
        if "QueueName" not in kwargs:
1✔
336
            kwargs["QueueName"] = "test-queue-%s" % short_uid()
1✔
337

338
        response = aws_client.sqs.create_queue(**kwargs)
1✔
339
        url = response["QueueUrl"]
1✔
340
        queue_urls.append(url)
1✔
341

342
        return url
1✔
343

344
    yield factory
1✔
345

346
    # cleanup
347
    for queue_url in queue_urls:
1✔
348
        try:
1✔
349
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
350
        except Exception as e:
1✔
351
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
352

353

354
@pytest.fixture
1✔
355
def sqs_receive_messages_delete(aws_client):
1✔
356
    def factory(
1✔
357
        queue_url: str,
358
        expected_messages: Optional[int] = None,
359
        wait_time: Optional[int] = 5,
360
    ):
361
        response = aws_client.sqs.receive_message(
1✔
362
            QueueUrl=queue_url,
363
            MessageAttributeNames=["All"],
364
            VisibilityTimeout=0,
365
            WaitTimeSeconds=wait_time,
366
        )
367
        messages = []
1✔
368
        for m in response["Messages"]:
1✔
369
            message = json.loads(to_str(m["Body"]))
1✔
370
            messages.append(message)
1✔
371

372
        if expected_messages is not None:
1✔
373
            assert len(messages) == expected_messages
×
374

375
        for message in response["Messages"]:
1✔
376
            aws_client.sqs.delete_message(
1✔
377
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
378
            )
379

380
        return messages
1✔
381

382
    return factory
1✔
383

384

385
@pytest.fixture
1✔
386
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
387
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
388
        all_messages = []
1✔
389
        for _ in range(max_iterations):
1✔
390
            try:
1✔
391
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
392
            except KeyError:
×
393
                # there were no messages
394
                continue
×
395
            all_messages.extend(messages)
1✔
396

397
            if len(all_messages) >= expected_messages:
1✔
398
                return all_messages[:expected_messages]
1✔
399

400
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
401

402
    return factory
1✔
403

404

405
@pytest.fixture
1✔
406
def sqs_collect_messages(aws_client):
1✔
407
    """Collects SQS messages from a given queue_url and deletes them by default.
408
    Example usage:
409
    messages = sqs_collect_messages(
410
         my_queue_url,
411
         expected=2,
412
         timeout=10,
413
         attribute_names=["All"],
414
         message_attribute_names=["All"],
415
    )
416
    """
417

418
    def factory(
1✔
419
        queue_url: str,
420
        expected: int,
421
        timeout: int,
422
        delete: bool = True,
423
        attribute_names: list[str] = None,
424
        message_attribute_names: list[str] = None,
425
        max_number_of_messages: int = 1,
426
        wait_time_seconds: int = 5,
427
        sqs_client: "SQSClient | None" = None,
428
    ) -> list["MessageTypeDef"]:
429
        sqs_client = sqs_client or aws_client.sqs
1✔
430
        collected = []
1✔
431

432
        def _receive():
1✔
433
            response = sqs_client.receive_message(
1✔
434
                QueueUrl=queue_url,
435
                # Maximum is 20 seconds. Performs long polling.
436
                WaitTimeSeconds=wait_time_seconds,
437
                # Maximum 10 messages
438
                MaxNumberOfMessages=max_number_of_messages,
439
                AttributeNames=attribute_names or [],
440
                MessageAttributeNames=message_attribute_names or [],
441
            )
442

443
            if messages := response.get("Messages"):
1✔
444
                collected.extend(messages)
1✔
445

446
                if delete:
1✔
447
                    for m in messages:
1✔
448
                        sqs_client.delete_message(
1✔
449
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
450
                        )
451

452
            return len(collected) >= expected
1✔
453

454
        if not poll_condition(_receive, timeout=timeout):
1✔
455
            raise TimeoutError(
×
456
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
457
            )
458

459
        return collected
1✔
460

461
    yield factory
1✔
462

463

464
@pytest.fixture
1✔
465
def sqs_queue(sqs_create_queue):
1✔
466
    return sqs_create_queue()
1✔
467

468

469
@pytest.fixture
1✔
470
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
471
    def _get_queue_arn(queue_url: str) -> str:
1✔
472
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
473
            "Attributes"
474
        ]["QueueArn"]
475

476
    return _get_queue_arn
1✔
477

478

479
@pytest.fixture
1✔
480
def sqs_queue_exists(aws_client):
1✔
481
    def _queue_exists(queue_url: str) -> bool:
1✔
482
        """
483
        Checks whether a queue with the given queue URL exists.
484
        :param queue_url: the queue URL
485
        :return: true if the queue exists, false otherwise
486
        """
487
        try:
1✔
488
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
489
            return result.get("QueueUrl") == queue_url
×
490
        except ClientError as e:
1✔
491
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
492
                return False
1✔
493
            raise
×
494

495
    yield _queue_exists
1✔
496

497

498
@pytest.fixture
1✔
499
def sns_create_topic(aws_client):
1✔
500
    topic_arns = []
1✔
501

502
    def _create_topic(**kwargs):
1✔
503
        if "Name" not in kwargs:
1✔
504
            kwargs["Name"] = "test-topic-%s" % short_uid()
1✔
505
        response = aws_client.sns.create_topic(**kwargs)
1✔
506
        topic_arns.append(response["TopicArn"])
1✔
507
        return response
1✔
508

509
    yield _create_topic
1✔
510

511
    for topic_arn in topic_arns:
1✔
512
        try:
1✔
513
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
514
        except Exception as e:
×
515
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
516

517

518
@pytest.fixture
1✔
519
def sns_wait_for_topic_delete(aws_client):
1✔
520
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
521
        def wait():
1✔
522
            try:
1✔
523
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
524
                return False
×
525
            except Exception as e:
1✔
526
                if "NotFound" in e.response["Error"]["Code"]:
1✔
527
                    return True
1✔
528

529
                raise
×
530

531
        poll_condition(wait, timeout=30)
1✔
532

533
    return wait_for_topic_delete
1✔
534

535

536
@pytest.fixture
1✔
537
def sns_subscription(aws_client):
1✔
538
    sub_arns = []
1✔
539

540
    def _create_sub(**kwargs):
1✔
541
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
542
            kwargs["ReturnSubscriptionArn"] = True
1✔
543

544
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
545
        response = aws_client.sns.subscribe(**kwargs)
1✔
546
        sub_arn = response["SubscriptionArn"]
1✔
547
        sub_arns.append(sub_arn)
1✔
548
        return response
1✔
549

550
    yield _create_sub
1✔
551

552
    for sub_arn in sub_arns:
1✔
553
        try:
1✔
554
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
555
        except Exception as e:
1✔
556
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
557

558

559
@pytest.fixture
1✔
560
def sns_topic(sns_create_topic, aws_client):
1✔
561
    topic_arn = sns_create_topic()["TopicArn"]
1✔
562
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
563

564

565
@pytest.fixture
1✔
566
def sns_allow_topic_sqs_queue(aws_client):
1✔
567
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
568
        # allow topic to write to sqs queue
569
        aws_client.sqs.set_queue_attributes(
1✔
570
            QueueUrl=sqs_queue_url,
571
            Attributes={
572
                "Policy": json.dumps(
573
                    {
574
                        "Statement": [
575
                            {
576
                                "Effect": "Allow",
577
                                "Principal": {"Service": "sns.amazonaws.com"},
578
                                "Action": "sqs:SendMessage",
579
                                "Resource": sqs_queue_arn,
580
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
581
                            }
582
                        ]
583
                    }
584
                )
585
            },
586
        )
587

588
    return _allow_sns_topic
1✔
589

590

591
@pytest.fixture
1✔
592
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
593
    subscriptions = []
1✔
594

595
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> Dict[str, str]:
1✔
596
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
597

598
        # connect sns topic to sqs
599
        subscription = aws_client.sns.subscribe(
1✔
600
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
601
        )
602
        subscription_arn = subscription["SubscriptionArn"]
1✔
603

604
        # allow topic to write to sqs queue
605
        sns_allow_topic_sqs_queue(
1✔
606
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
607
        )
608

609
        subscriptions.append(subscription_arn)
1✔
610
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
611
            "Attributes"
612
        ]
613

614
    yield _factory
1✔
615

616
    for arn in subscriptions:
1✔
617
        try:
1✔
618
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
619
        except Exception as e:
×
620
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
621

622

623
@pytest.fixture
1✔
624
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
625
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
626
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
627
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
628
    http_servers = []
1✔
629

630
    def _create_http_endpoint(
1✔
631
        raw_message_delivery: bool = False,
632
    ) -> Tuple[str, str, str, HTTPServer]:
633
        server = HTTPServer()
1✔
634
        server.start()
1✔
635
        http_servers.append(server)
1✔
636
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
637
        endpoint_url = server.url_for("/sns-endpoint")
1✔
638
        wait_for_port_open(endpoint_url)
1✔
639

640
        topic_arn = sns_create_topic()["TopicArn"]
1✔
641
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
642
        subscription_arn = subscription["SubscriptionArn"]
1✔
643
        delivery_policy = {
1✔
644
            "healthyRetryPolicy": {
645
                "minDelayTarget": 1,
646
                "maxDelayTarget": 1,
647
                "numRetries": 0,
648
                "numNoDelayRetries": 0,
649
                "numMinDelayRetries": 0,
650
                "numMaxDelayRetries": 0,
651
                "backoffFunction": "linear",
652
            },
653
            "sicklyRetryPolicy": None,
654
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
655
            "guaranteed": False,
656
        }
657
        aws_client.sns.set_subscription_attributes(
1✔
658
            SubscriptionArn=subscription_arn,
659
            AttributeName="DeliveryPolicy",
660
            AttributeValue=json.dumps(delivery_policy),
661
        )
662

663
        if raw_message_delivery:
1✔
664
            aws_client.sns.set_subscription_attributes(
1✔
665
                SubscriptionArn=subscription_arn,
666
                AttributeName="RawMessageDelivery",
667
                AttributeValue="true",
668
            )
669

670
        return topic_arn, subscription_arn, endpoint_url, server
1✔
671

672
    yield _create_http_endpoint
1✔
673

674
    for http_server in http_servers:
1✔
675
        if http_server.is_running():
1✔
676
            http_server.stop()
1✔
677

678

679
@pytest.fixture
1✔
680
def route53_hosted_zone(aws_client):
1✔
681
    hosted_zones = []
1✔
682

683
    def factory(**kwargs):
1✔
684
        if "Name" not in kwargs:
1✔
685
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
686
        if "CallerReference" not in kwargs:
1✔
687
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
688
        response = aws_client.route53.create_hosted_zone(
1✔
689
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
690
        )
691
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
692
        return response
1✔
693

694
    yield factory
1✔
695

696
    for zone in hosted_zones:
1✔
697
        try:
1✔
698
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
699
        except Exception as e:
1✔
700
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
701

702

703
@pytest.fixture
1✔
704
def transcribe_create_job(s3_bucket, aws_client):
1✔
705
    job_names = []
1✔
706

707
    def _create_job(audio_file: str, params: Optional[dict[str, Any]] = None) -> str:
1✔
708
        s3_key = "test-clip.wav"
1✔
709

710
        if not params:
1✔
711
            params = {}
1✔
712

713
        if "TranscriptionJobName" not in params:
1✔
714
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
715

716
        if "LanguageCode" not in params:
1✔
717
            params["LanguageCode"] = "en-GB"
1✔
718

719
        if "Media" not in params:
1✔
720
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
721

722
        # upload test wav to a s3 bucket
723
        with open(audio_file, "rb") as f:
1✔
724
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
725

726
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
727

728
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
729
        job_names.append(job_name)
1✔
730

731
        return job_name
1✔
732

733
    yield _create_job
1✔
734

735
    for job_name in job_names:
1✔
736
        with contextlib.suppress(ClientError):
1✔
737
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
738

739

740
@pytest.fixture
1✔
741
def kinesis_create_stream(aws_client):
1✔
742
    stream_names = []
1✔
743

744
    def _create_stream(**kwargs):
1✔
745
        if "StreamName" not in kwargs:
1✔
746
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
747
        aws_client.kinesis.create_stream(**kwargs)
1✔
748
        stream_names.append(kwargs["StreamName"])
1✔
749
        return kwargs["StreamName"]
1✔
750

751
    yield _create_stream
1✔
752

753
    for stream_name in stream_names:
1✔
754
        try:
1✔
755
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
756
        except Exception as e:
×
757
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
×
758

759

760
@pytest.fixture
1✔
761
def wait_for_stream_ready(aws_client):
1✔
762
    def _wait_for_stream_ready(stream_name: str):
1✔
763
        def is_stream_ready():
1✔
764
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
765
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
766
                "ACTIVE",
767
                "UPDATING",
768
            ]
769

770
        return poll_condition(is_stream_ready)
1✔
771

772
    return _wait_for_stream_ready
1✔
773

774

775
@pytest.fixture
1✔
776
def wait_for_delivery_stream_ready(aws_client):
1✔
777
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
778
        def is_stream_ready():
1✔
779
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
780
                DeliveryStreamName=delivery_stream_name
781
            )
782
            return (
1✔
783
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
784
                == "ACTIVE"
785
            )
786

787
        poll_condition(is_stream_ready)
1✔
788

789
    return _wait_for_stream_ready
1✔
790

791

792
@pytest.fixture
1✔
793
def wait_for_dynamodb_stream_ready(aws_client):
1✔
794
    def _wait_for_stream_ready(stream_arn: str):
1✔
795
        def is_stream_ready():
1✔
796
            describe_stream_response = aws_client.dynamodbstreams.describe_stream(
1✔
797
                StreamArn=stream_arn
798
            )
799
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
800

801
        return poll_condition(is_stream_ready)
1✔
802

803
    return _wait_for_stream_ready
1✔
804

805

806
@pytest.fixture()
1✔
807
def kms_create_key(aws_client_factory):
1✔
808
    key_ids = []
1✔
809

810
    def _create_key(region_name: str = None, **kwargs):
1✔
811
        if "Description" not in kwargs:
1✔
812
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
813
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
814
            "KeyMetadata"
815
        ]
816
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
817
        return key_metadata
1✔
818

819
    yield _create_key
1✔
820

821
    for region_name, key_id in key_ids:
1✔
822
        try:
1✔
823
            # shortest amount of time you can schedule the deletion
824
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
825
                KeyId=key_id, PendingWindowInDays=7
826
            )
827
        except Exception as e:
1✔
828
            exception_message = str(e)
1✔
829
            # Some tests schedule their keys for deletion themselves.
830
            if (
1✔
831
                "KMSInvalidStateException" not in exception_message
832
                or "is pending deletion" not in exception_message
833
            ):
834
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
835

836

837
@pytest.fixture()
1✔
838
def kms_replicate_key(aws_client_factory):
1✔
839
    key_ids = []
1✔
840

841
    def _replicate_key(region_from=None, **kwargs):
1✔
842
        region_to = kwargs.get("ReplicaRegion")
1✔
843
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
844
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
845

846
    yield _replicate_key
1✔
847

848
    for region_to, key_id in key_ids:
1✔
849
        try:
1✔
850
            # shortest amount of time you can schedule the deletion
851
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
852
                KeyId=key_id, PendingWindowInDays=7
853
            )
854
        except Exception as e:
×
855
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
856

857

858
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
859
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
860
# to make sure that we clean up aliases before keys get cleaned up.
861
@pytest.fixture()
1✔
862
def kms_create_alias(kms_create_key, aws_client):
1✔
863
    aliases = []
1✔
864

865
    def _create_alias(**kwargs):
1✔
866
        if "AliasName" not in kwargs:
1✔
867
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
868
        if "TargetKeyId" not in kwargs:
1✔
869
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
870

871
        aws_client.kms.create_alias(**kwargs)
1✔
872
        aliases.append(kwargs["AliasName"])
1✔
873
        return kwargs["AliasName"]
1✔
874

875
    yield _create_alias
1✔
876

877
    for alias in aliases:
1✔
878
        try:
1✔
879
            aws_client.kms.delete_alias(AliasName=alias)
1✔
880
        except Exception as e:
1✔
881
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
882

883

884
@pytest.fixture()
1✔
885
def kms_create_grant(kms_create_key, aws_client):
1✔
886
    grants = []
1✔
887

888
    def _create_grant(**kwargs):
1✔
889
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
890
        # but some GranteePrincipal is required to create a grant.
891
        GRANTEE_PRINCIPAL_ARN = (
1✔
892
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
893
        )
894

895
        if "Operations" not in kwargs:
1✔
896
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
897
        if "GranteePrincipal" not in kwargs:
1✔
898
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
899
        if "KeyId" not in kwargs:
1✔
900
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
901

902
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
903
        grants.append((grant_id, kwargs["KeyId"]))
1✔
904
        return grant_id, kwargs["KeyId"]
1✔
905

906
    yield _create_grant
1✔
907

908
    for grant_id, key_id in grants:
1✔
909
        try:
1✔
910
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
911
        except Exception as e:
×
912
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
913

914

915
@pytest.fixture
1✔
916
def kms_key(kms_create_key):
1✔
917
    return kms_create_key()
1✔
918

919

920
@pytest.fixture
1✔
921
def kms_grant_and_key(kms_key, aws_client):
1✔
922
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
923

924
    return [
1✔
925
        aws_client.kms.create_grant(
926
            KeyId=kms_key["KeyId"],
927
            GranteePrincipal=user_arn,
928
            Operations=["Decrypt", "Encrypt"],
929
        ),
930
        kms_key,
931
    ]
932

933

934
@pytest.fixture
1✔
935
def opensearch_wait_for_cluster(aws_client):
1✔
936
    def _wait_for_cluster(domain_name: str):
1✔
937
        def finished_processing():
1✔
938
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
939
            return status["Processing"] is False and "Endpoint" in status
1✔
940

941
        assert poll_condition(
1✔
942
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
943
        ), f"could not start domain: {domain_name}"
944

945
    return _wait_for_cluster
1✔
946

947

948
@pytest.fixture
1✔
949
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
950
    domains = []
1✔
951

952
    def factory(**kwargs) -> str:
1✔
953
        if "DomainName" not in kwargs:
1✔
954
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
955

956
        aws_client.opensearch.create_domain(**kwargs)
1✔
957

958
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
959

960
        domains.append(kwargs["DomainName"])
1✔
961
        return kwargs["DomainName"]
1✔
962

963
    yield factory
1✔
964

965
    # cleanup
966
    for domain in domains:
1✔
967
        try:
1✔
968
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
969
        except Exception as e:
×
970
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
971

972

973
@pytest.fixture
1✔
974
def opensearch_domain(opensearch_create_domain) -> str:
1✔
975
    return opensearch_create_domain()
1✔
976

977

978
@pytest.fixture
1✔
979
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
980
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
981
    assert "Endpoint" in status
1✔
982
    return f"https://{status['Endpoint']}"
1✔
983

984

985
@pytest.fixture
1✔
986
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
987
    document = {
1✔
988
        "first_name": "Boba",
989
        "last_name": "Fett",
990
        "age": 41,
991
        "about": "I'm just a simple man, trying to make my way in the universe.",
992
        "interests": ["mandalorian armor", "tusken culture"],
993
    }
994
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
995
    response = requests.put(
1✔
996
        document_path,
997
        data=json.dumps(document),
998
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
999
    )
1000
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1001
    return document_path
1✔
1002

1003

1004
# Cleanup fixtures
1005
@pytest.fixture
1✔
1006
def cleanup_stacks(aws_client):
1✔
1007
    def _cleanup_stacks(stacks: List[str]) -> None:
1✔
1008
        stacks = ensure_list(stacks)
1✔
1009
        for stack in stacks:
1✔
1010
            try:
1✔
1011
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1012
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1013
            except Exception:
×
1014
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1015

1016
    return _cleanup_stacks
1✔
1017

1018

1019
@pytest.fixture
1✔
1020
def cleanup_changesets(aws_client):
1✔
1021
    def _cleanup_changesets(changesets: List[str]) -> None:
1✔
1022
        changesets = ensure_list(changesets)
1✔
1023
        for cs in changesets:
1✔
1024
            try:
1✔
1025
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1026
            except Exception:
1✔
1027
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1028

1029
    return _cleanup_changesets
1✔
1030

1031

1032
# Helpers for Cfn
1033

1034

1035
# TODO: exports(!)
1036
@dataclasses.dataclass(frozen=True)
1✔
1037
class DeployResult:
1✔
1038
    change_set_id: str
1✔
1039
    stack_id: str
1✔
1040
    stack_name: str
1✔
1041
    change_set_name: str
1✔
1042
    outputs: Dict[str, str]
1✔
1043

1044
    destroy: Callable[[], None]
1✔
1045

1046

1047
class StackDeployError(Exception):
1✔
1048
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1049
        self.describe_result = describe_res
1✔
1050
        self.events = events
1✔
1051

1052
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1053
        if config.CFN_VERBOSE_ERRORS:
1✔
1054
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1055
        else:
1056
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1057

1058
        super().__init__(msg)
1✔
1059

1060
    def format_events(self, events: list[dict]) -> str:
1✔
1061
        formatted_events = []
1✔
1062

1063
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1064
        for event in chronological_events:
1✔
1065
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1066
                formatted_events.append(self.format_event(event))
1✔
1067

1068
        return "\n".join(formatted_events)
1✔
1069

1070
    @staticmethod
1✔
1071
    def format_event(event: dict) -> str:
1✔
1072
        if reason := event.get("ResourceStatusReason"):
1✔
1073
            reason = reason.replace("\n", "; ")
1✔
1074
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1075
        else:
1076
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
×
1077

1078

1079
@pytest.fixture
1✔
1080
def deploy_cfn_template(
1✔
1081
    aws_client: ServiceLevelClientFactory,
1082
):
1083
    state: list[tuple[str, Callable]] = []
1✔
1084

1085
    def _deploy(
1✔
1086
        *,
1087
        is_update: Optional[bool] = False,
1088
        stack_name: Optional[str] = None,
1089
        change_set_name: Optional[str] = None,
1090
        template: Optional[str] = None,
1091
        template_path: Optional[str | os.PathLike] = None,
1092
        template_mapping: Optional[Dict[str, Any]] = None,
1093
        parameters: Optional[Dict[str, str]] = None,
1094
        role_arn: Optional[str] = None,
1095
        max_wait: Optional[int] = None,
1096
        delay_between_polls: Optional[int] = 2,
1097
        custom_aws_client: Optional[ServiceLevelClientFactory] = None,
1098
    ) -> DeployResult:
1099
        if is_update:
1✔
1100
            assert stack_name
1✔
1101
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1102
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1103

1104
        if max_wait is None:
1✔
1105
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1106

1107
        if template_path is not None:
1✔
1108
            template = load_template_file(template_path)
1✔
1109
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1110

1111
        kwargs = dict(
1✔
1112
            StackName=stack_name,
1113
            ChangeSetName=change_set_name,
1114
            TemplateBody=template_rendered,
1115
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1116
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1117
            Parameters=[
1118
                {
1119
                    "ParameterKey": k,
1120
                    "ParameterValue": v,
1121
                }
1122
                for (k, v) in (parameters or {}).items()
1123
            ],
1124
        )
1125
        if role_arn is not None:
1✔
1126
            kwargs["RoleARN"] = role_arn
×
1127

1128
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1129

1130
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1131

1132
        change_set_id = response["Id"]
1✔
1133
        stack_id = response["StackId"]
1✔
1134

1135
        cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1136
            ChangeSetName=change_set_id
1137
        )
1138
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1139
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1140
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1141
        )
1142

1143
        try:
1✔
1144
            stack_waiter.wait(
1✔
1145
                StackName=stack_id,
1146
                WaiterConfig={
1147
                    "Delay": delay_between_polls,
1148
                    "MaxAttempts": max_wait / delay_between_polls,
1149
                },
1150
            )
1151
        except botocore.exceptions.WaiterError as e:
1✔
1152
            raise StackDeployError(
1✔
1153
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1154
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1155
                    "StackEvents"
1156
                ],
1157
            ) from e
1158

1159
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1160
            "Stacks"
1161
        ][0]
1162
        outputs = describe_stack_res.get("Outputs", [])
1✔
1163

1164
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1165

1166
        def _destroy_stack():
1✔
1167
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1168
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1169
                StackName=stack_id,
1170
                WaiterConfig={
1171
                    "Delay": delay_between_polls,
1172
                    "MaxAttempts": max_wait / delay_between_polls,
1173
                },
1174
            )
1175

1176
        state.append((stack_id, _destroy_stack))
1✔
1177

1178
        return DeployResult(
1✔
1179
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1180
        )
1181

1182
    yield _deploy
1✔
1183

1184
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1185
    for stack_id, teardown in state[::-1]:
1✔
1186
        try:
1✔
1187
            teardown()
1✔
1188
        except Exception as e:
1✔
1189
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1190

1191

1192
@pytest.fixture
1✔
1193
def is_change_set_created_and_available(aws_client):
1✔
1194
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1195
        def _inner():
1✔
1196
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1197
            return (
1✔
1198
                # TODO: CREATE_FAILED should also not lead to further retries
1199
                change_set.get("Status") == "CREATE_COMPLETE"
1200
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1201
            )
1202

1203
        return _inner
1✔
1204

1205
    return _is_change_set_created_and_available
1✔
1206

1207

1208
@pytest.fixture
1✔
1209
def is_change_set_failed_and_unavailable(aws_client):
1✔
1210
    def _is_change_set_created_and_available(change_set_id: str):
×
1211
        def _inner():
×
1212
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1213
            return (
×
1214
                # TODO: CREATE_FAILED should also not lead to further retries
1215
                change_set.get("Status") == "FAILED"
1216
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1217
            )
1218

1219
        return _inner
×
1220

1221
    return _is_change_set_created_and_available
×
1222

1223

1224
@pytest.fixture
1✔
1225
def is_stack_created(aws_client):
1✔
1226
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1227

1228

1229
@pytest.fixture
1✔
1230
def is_stack_updated(aws_client):
1✔
1231
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1232

1233

1234
@pytest.fixture
1✔
1235
def is_stack_deleted(aws_client):
1✔
1236
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1237

1238

1239
def _has_stack_status(cfn_client, statuses: List[str]):
1✔
1240
    def _has_status(stack_id: str):
1✔
1241
        def _inner():
1✔
1242
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1243
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1244
            return s.get("StackStatus") in statuses
1✔
1245

1246
        return _inner
1✔
1247

1248
    return _has_status
1✔
1249

1250

1251
@pytest.fixture
1✔
1252
def is_change_set_finished(aws_client):
1✔
1253
    def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):
1✔
1254
        def _inner():
1✔
1255
            kwargs = {"ChangeSetName": change_set_id}
1✔
1256
            if stack_name:
1✔
1257
                kwargs["StackName"] = stack_name
×
1258

1259
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
1✔
1260

1261
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
1✔
1262
                LOG.warning("Change set failed")
×
1263
                raise ShortCircuitWaitException()
×
1264

1265
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
1✔
1266

1267
        return _inner
1✔
1268

1269
    return _is_change_set_finished
1✔
1270

1271

1272
@pytest.fixture
1✔
1273
def wait_until_lambda_ready(aws_client):
1✔
1274
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1275
        client = client or aws_client.lambda_
1✔
1276

1277
        def _is_not_pending():
1✔
1278
            kwargs = {}
1✔
1279
            if qualifier:
1✔
1280
                kwargs["Qualifier"] = qualifier
×
1281
            try:
1✔
1282
                result = (
1✔
1283
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1284
                    != "Pending"
1285
                )
1286
                LOG.debug("lambda state result: result=%s", result)
1✔
1287
                return result
1✔
1288
            except Exception as e:
×
1289
                LOG.error(e)
×
1290
                raise
×
1291

1292
        wait_until(_is_not_pending)
1✔
1293

1294
    return _wait_until_ready
1✔
1295

1296

1297
role_assume_policy = """
1✔
1298
{
1299
  "Version": "2012-10-17",
1300
  "Statement": [
1301
    {
1302
      "Effect": "Allow",
1303
      "Principal": {
1304
        "Service": "lambda.amazonaws.com"
1305
      },
1306
      "Action": "sts:AssumeRole"
1307
    }
1308
  ]
1309
}
1310
""".strip()
1311

1312
role_policy = """
1✔
1313
{
1314
    "Version": "2012-10-17",
1315
    "Statement": [
1316
        {
1317
            "Effect": "Allow",
1318
            "Action": [
1319
                "logs:CreateLogGroup",
1320
                "logs:CreateLogStream",
1321
                "logs:PutLogEvents"
1322
            ],
1323
            "Resource": [
1324
                "*"
1325
            ]
1326
        }
1327
    ]
1328
}
1329
""".strip()
1330

1331

1332
@pytest.fixture
1✔
1333
def create_lambda_function_aws(aws_client):
1✔
1334
    lambda_arns = []
1✔
1335

1336
    def _create_lambda_function(**kwargs):
1✔
1337
        def _create_function():
1✔
1338
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1339
            lambda_arns.append(resp["FunctionArn"])
1✔
1340

1341
            def _is_not_pending():
1✔
1342
                try:
1✔
1343
                    result = (
1✔
1344
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1345
                            "Configuration"
1346
                        ]["State"]
1347
                        != "Pending"
1348
                    )
1349
                    return result
1✔
1350
                except Exception as e:
×
1351
                    LOG.error(e)
×
1352
                    raise
×
1353

1354
            wait_until(_is_not_pending)
1✔
1355
            return resp
1✔
1356

1357
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1358
        # localstack should normally not require the retries and will just continue here
1359
        return retry(_create_function, retries=3, sleep=4)
1✔
1360

1361
    yield _create_lambda_function
1✔
1362

1363
    for arn in lambda_arns:
1✔
1364
        try:
1✔
1365
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1366
        except Exception:
×
1367
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1368

1369

1370
@pytest.fixture
1✔
1371
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1372
    lambda_arns_and_clients = []
1✔
1373
    log_groups = []
1✔
1374
    lambda_client = aws_client.lambda_
1✔
1375
    logs_client = aws_client.logs
1✔
1376
    s3_client = aws_client.s3
1✔
1377

1378
    def _create_lambda_function(*args, **kwargs):
1✔
1379
        client = kwargs.get("client") or lambda_client
1✔
1380
        kwargs["client"] = client
1✔
1381
        kwargs["s3_client"] = s3_client
1✔
1382
        func_name = kwargs.get("func_name")
1✔
1383
        assert func_name
1✔
1384
        del kwargs["func_name"]
1✔
1385

1386
        if not kwargs.get("role"):
1✔
1387
            kwargs["role"] = lambda_su_role
1✔
1388

1389
        def _create_function():
1✔
1390
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1391
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1392
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1393
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1394
            log_groups.append(log_group_name)
1✔
1395
            return resp
1✔
1396

1397
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1398
        # localstack should normally not require the retries and will just continue here
1399
        return retry(_create_function, retries=3, sleep=4)
1✔
1400

1401
    yield _create_lambda_function
1✔
1402

1403
    for arn, client in lambda_arns_and_clients:
1✔
1404
        try:
1✔
1405
            client.delete_function(FunctionName=arn)
1✔
1406
        except Exception:
1✔
1407
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1408

1409
    for log_group_name in log_groups:
1✔
1410
        try:
1✔
1411
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1412
        except Exception:
1✔
1413
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1414

1415

1416
@pytest.fixture
1✔
1417
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1418
    from localstack.aws.api.lambda_ import Runtime
1✔
1419

1420
    lambda_client = aws_client.lambda_
1✔
1421
    handler_code = textwrap.dedent(
1✔
1422
        """
1423
    import json
1424
    import os
1425

1426

1427
    def make_response(body: dict, status_code: int = 200):
1428
        return {
1429
            "statusCode": status_code,
1430
            "headers": {"Content-Type": "application/json"},
1431
            "body": body,
1432
        }
1433

1434

1435
    def trim_headers(headers):
1436
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1437
            return headers
1438
        return {
1439
            key: value for key, value in headers.items()
1440
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1441
        }
1442

1443

1444
    def handler(event, context):
1445
        print(json.dumps(event))
1446
        response = {
1447
            "args": event.get("queryStringParameters", {}),
1448
            "data": event.get("body", ""),
1449
            "domain": event["requestContext"].get("domainName", ""),
1450
            "headers": trim_headers(event.get("headers", {})),
1451
            "method": event["requestContext"]["http"].get("method", ""),
1452
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1453
            "path": event["requestContext"]["http"].get("path", ""),
1454
        }
1455
        return make_response(response)"""
1456
    )
1457

1458
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1459
        """Creates a server that will echo any request. Any request will be returned with the
1460
        following format. Any unset values will have those defaults.
1461
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1462
        order to create easier Snapshot testing. Default: `False`
1463
        {
1464
          "args": {},
1465
          "headers": {},
1466
          "data": "",
1467
          "method": "",
1468
          "domain": "",
1469
          "origin": "",
1470
          "path": ""
1471
        }"""
1472
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1473
        func_name = f"echo-http-{short_uid()}"
1✔
1474
        create_lambda_function(
1✔
1475
            func_name=func_name,
1476
            zip_file=zip_file,
1477
            runtime=Runtime.python3_12,
1478
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1479
        )
1480
        url_response = lambda_client.create_function_url_config(
1✔
1481
            FunctionName=func_name, AuthType="NONE"
1482
        )
1483
        aws_client.lambda_.add_permission(
1✔
1484
            FunctionName=func_name,
1485
            StatementId="urlPermission",
1486
            Action="lambda:InvokeFunctionUrl",
1487
            Principal="*",
1488
            FunctionUrlAuthType="NONE",
1489
        )
1490
        return url_response["FunctionUrl"]
1✔
1491

1492
    yield _create_echo_http_server
1✔
1493

1494

1495
@pytest.fixture
1✔
1496
def create_event_source_mapping(aws_client):
1✔
1497
    uuids = []
1✔
1498

1499
    def _create_event_source_mapping(*args, **kwargs):
1✔
1500
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1501
        uuids.append(response["UUID"])
1✔
1502
        return response
1✔
1503

1504
    yield _create_event_source_mapping
1✔
1505

1506
    for uuid in uuids:
1✔
1507
        try:
1✔
1508
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1509
        except Exception:
×
1510
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1511

1512

1513
@pytest.fixture
1✔
1514
def check_lambda_logs(aws_client):
1✔
1515
    def _check_logs(func_name: str, expected_lines: List[str] = None) -> List[str]:
1✔
1516
        if not expected_lines:
1✔
1517
            expected_lines = []
×
1518
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1519
        log_messages = [e["message"] for e in log_events]
1✔
1520
        for line in expected_lines:
1✔
1521
            if ".*" in line:
1✔
1522
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1523
                if any(found):
1✔
1524
                    continue
1✔
1525
            assert line in log_messages
1✔
1526
        return log_messages
1✔
1527

1528
    return _check_logs
1✔
1529

1530

1531
@pytest.fixture
1✔
1532
def create_policy(aws_client):
1✔
1533
    policy_arns = []
1✔
1534

1535
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1536
        iam_client = iam_client or aws_client.iam
1✔
1537
        if "PolicyName" not in kwargs:
1✔
1538
            kwargs["PolicyName"] = f"policy-{short_uid()}"
×
1539
        response = iam_client.create_policy(*args, **kwargs)
1✔
1540
        policy_arn = response["Policy"]["Arn"]
1✔
1541
        policy_arns.append((policy_arn, iam_client))
1✔
1542
        return response
1✔
1543

1544
    yield _create_policy
1✔
1545

1546
    for policy_arn, iam_client in policy_arns:
1✔
1547
        try:
1✔
1548
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1549
        except Exception:
1✔
1550
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1551

1552

1553
@pytest.fixture
1✔
1554
def create_user(aws_client):
1✔
1555
    usernames = []
1✔
1556

1557
    def _create_user(**kwargs):
1✔
1558
        if "UserName" not in kwargs:
1✔
1559
            kwargs["UserName"] = f"user-{short_uid()}"
×
1560
        response = aws_client.iam.create_user(**kwargs)
1✔
1561
        usernames.append(response["User"]["UserName"])
1✔
1562
        return response
1✔
1563

1564
    yield _create_user
1✔
1565

1566
    for username in usernames:
1✔
1567
        try:
1✔
1568
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1569
        except ClientError as e:
1✔
1570
            LOG.debug(
1✔
1571
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1572
            )
1573
            continue
1✔
1574

1575
        for inline_policy in inline_policies:
1✔
1576
            try:
1✔
1577
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1578
            except Exception:
×
1579
                LOG.debug(
×
1580
                    "Could not delete user policy '%s' from '%s' during cleanup",
1581
                    inline_policy,
1582
                    username,
1583
                )
1584
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1585
            "AttachedPolicies"
1586
        ]
1587
        for attached_policy in attached_policies:
1✔
1588
            try:
1✔
1589
                aws_client.iam.detach_user_policy(
1✔
1590
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1591
                )
1592
            except Exception:
1✔
1593
                LOG.debug(
1✔
1594
                    "Error detaching policy '%s' from user '%s'",
1595
                    attached_policy["PolicyArn"],
1596
                    username,
1597
                )
1598
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1599
        for access_key in access_keys:
1✔
1600
            try:
1✔
1601
                aws_client.iam.delete_access_key(
1✔
1602
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1603
                )
1604
            except Exception:
×
1605
                LOG.debug(
×
1606
                    "Error deleting access key '%s' from user '%s'",
1607
                    access_key["AccessKeyId"],
1608
                    username,
1609
                )
1610

1611
        try:
1✔
1612
            aws_client.iam.delete_user(UserName=username)
1✔
1613
        except Exception as e:
1✔
1614
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1615

1616

1617
@pytest.fixture
1✔
1618
def wait_and_assume_role(aws_client):
1✔
1619
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1620
        if not session_name:
1✔
1621
            session_name = f"session-{short_uid()}"
1✔
1622

1623
        def assume_role():
1✔
1624
            return aws_client.sts.assume_role(
1✔
1625
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1626
            )["Credentials"]
1627

1628
        # need to retry a couple of times before we are allowed to assume this role in AWS
1629
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1630
        return keys
1✔
1631

1632
    return _wait_and_assume_role
1✔
1633

1634

1635
@pytest.fixture
1✔
1636
def create_role(aws_client):
1✔
1637
    role_names = []
1✔
1638

1639
    def _create_role(iam_client=None, **kwargs):
1✔
1640
        if not kwargs.get("RoleName"):
1✔
1641
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1642
        iam_client = iam_client or aws_client.iam
1✔
1643
        result = iam_client.create_role(**kwargs)
1✔
1644
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1645
        return result
1✔
1646

1647
    yield _create_role
1✔
1648

1649
    for role_name, iam_client in role_names:
1✔
1650
        # detach policies
1651
        try:
1✔
1652
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1653
                "AttachedPolicies"
1654
            ]
1655
        except ClientError as e:
1✔
1656
            LOG.debug(
1✔
1657
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1658
                e,
1659
                role_name,
1660
            )
1661
            continue
1✔
1662
        for attached_policy in attached_policies:
1✔
1663
            try:
1✔
1664
                iam_client.detach_role_policy(
1✔
1665
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1666
                )
1667
            except Exception:
×
1668
                LOG.debug(
×
1669
                    "Could not detach role policy '%s' from '%s' during cleanup",
1670
                    attached_policy["PolicyArn"],
1671
                    role_name,
1672
                )
1673
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1674
        for role_policy in role_policies:
1✔
1675
            try:
1✔
1676
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1677
            except Exception:
×
1678
                LOG.debug(
×
1679
                    "Could not delete role policy '%s' from '%s' during cleanup",
1680
                    role_policy,
1681
                    role_name,
1682
                )
1683
        try:
1✔
1684
            iam_client.delete_role(RoleName=role_name)
1✔
1685
        except Exception:
×
1686
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1687

1688

1689
@pytest.fixture
1✔
1690
def create_parameter(aws_client):
1✔
1691
    params = []
1✔
1692

1693
    def _create_parameter(**kwargs):
1✔
1694
        params.append(kwargs["Name"])
1✔
1695
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1696

1697
    yield _create_parameter
1✔
1698

1699
    for param in params:
1✔
1700
        aws_client.ssm.delete_parameter(Name=param)
1✔
1701

1702

1703
@pytest.fixture
1✔
1704
def create_secret(aws_client):
1✔
1705
    items = []
1✔
1706

1707
    def _create_parameter(**kwargs):
1✔
1708
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1709
        items.append(create_response["ARN"])
1✔
1710
        return create_response
1✔
1711

1712
    yield _create_parameter
1✔
1713

1714
    for item in items:
1✔
1715
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1716

1717

1718
# TODO Figure out how to make cert creation tests pass against AWS.
1719
#
1720
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1721
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1722
#
1723
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1724
# by adding some CNAME records as requested by ASW in response to a certificate request.
1725
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1726
#
1727
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1728
# created by openssl etc. Not sure if there are other issues after that.
1729
#
1730
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1731
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1732
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1733
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1734
# pre-created certificates we would have to use specific domain names instead of random ones.
1735
@pytest.fixture
1✔
1736
def acm_request_certificate(aws_client_factory):
1✔
1737
    certificate_arns = []
1✔
1738

1739
    def factory(**kwargs) -> str:
1✔
1740
        if "DomainName" not in kwargs:
1✔
1741
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1742

1743
        region_name = kwargs.pop("region_name", None)
1✔
1744
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1745

1746
        response = acm_client.request_certificate(**kwargs)
1✔
1747
        created_certificate_arn = response["CertificateArn"]
1✔
1748
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1749
        return response
1✔
1750

1751
    yield factory
1✔
1752

1753
    # cleanup
1754
    for certificate_arn, region_name in certificate_arns:
1✔
1755
        try:
1✔
1756
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1757
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1758
        except Exception as e:
×
1759
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1760

1761

1762
role_policy_su = {
1✔
1763
    "Version": "2012-10-17",
1764
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1765
}
1766

1767

1768
@pytest.fixture(scope="session")
1✔
1769
def lambda_su_role(aws_client):
1✔
1770
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1771
    role = aws_client.iam.create_role(
1✔
1772
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1773
    )["Role"]
1774
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1775
    policy_arn = aws_client.iam.create_policy(
1✔
1776
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1777
    )["Policy"]["Arn"]
1778
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1779

1780
    if is_aws_cloud():  # dirty but necessary
1✔
1781
        time.sleep(10)
×
1782

1783
    yield role["Arn"]
1✔
1784

1785
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1786
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1787
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1788

1789

1790
@pytest.fixture
1✔
1791
def create_iam_role_and_attach_policy(aws_client):
1✔
1792
    """
1793
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1794

1795
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1796
    """
1797
    roles = []
×
1798

1799
    def _inner(**kwargs: dict[str, any]) -> str:
×
1800
        """
1801
        :param dict RoleDefinition: role definition document
1802
        :param str PolicyArn: policy ARN
1803
        :param str RoleName: role name (autogenerated if omitted)
1804
        :return: role ARN
1805
        """
1806
        if "RoleName" not in kwargs:
×
1807
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1808

1809
        role = kwargs["RoleName"]
×
1810
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1811

1812
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1813
        role_arn = result["Role"]["Arn"]
×
1814

1815
        policy_arn = kwargs["PolicyArn"]
×
1816
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1817

1818
        roles.append(role)
×
1819
        return role_arn
×
1820

1821
    yield _inner
×
1822

1823
    for role in roles:
×
1824
        try:
×
1825
            aws_client.iam.delete_role(RoleName=role)
×
1826
        except Exception as exc:
×
1827
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1828

1829

1830
@pytest.fixture
1✔
1831
def create_iam_role_with_policy(aws_client):
1✔
1832
    """
1833
    Fixture that creates an IAM role with given role definition and policy definition.
1834
    """
1835
    roles = {}
1✔
1836

1837
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1838
        """
1839
        :param dict RoleDefinition: role definition document
1840
        :param dict PolicyDefinition: policy definition document
1841
        :param str PolicyName: policy name (autogenerated if omitted)
1842
        :param str RoleName: role name (autogenerated if omitted)
1843
        :return: role ARN
1844
        """
1845
        if "RoleName" not in kwargs:
1✔
1846
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1847
        role = kwargs["RoleName"]
1✔
1848
        if "PolicyName" not in kwargs:
1✔
1849
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1850
        policy = kwargs["PolicyName"]
1✔
1851
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1852

1853
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1854
        role_arn = result["Role"]["Arn"]
1✔
1855

1856
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1857
        aws_client.iam.put_role_policy(
1✔
1858
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1859
        )
1860
        roles[role] = policy
1✔
1861
        return role_arn
1✔
1862

1863
    yield _create_role_and_policy
1✔
1864

1865
    for role_name, policy_name in roles.items():
1✔
1866
        try:
1✔
1867
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1868
        except Exception as exc:
×
1869
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1870
        try:
1✔
1871
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1872
        except Exception as exc:
×
1873
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1874

1875

1876
@pytest.fixture
1✔
1877
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1878
    delivery_stream_names = []
1✔
1879

1880
    def _create_delivery_stream(**kwargs):
1✔
1881
        if "DeliveryStreamName" not in kwargs:
1✔
1882
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1883
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1884
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1885
        delivery_stream_names.append(delivery_stream_name)
1✔
1886
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1887
        return response
1✔
1888

1889
    yield _create_delivery_stream
1✔
1890

1891
    for delivery_stream_name in delivery_stream_names:
1✔
1892
        try:
1✔
1893
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1894
        except Exception:
×
1895
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1896

1897

1898
@pytest.fixture
1✔
1899
def ses_configuration_set(aws_client):
1✔
1900
    configuration_set_names = []
1✔
1901

1902
    def factory(name: str) -> None:
1✔
1903
        aws_client.ses.create_configuration_set(
1✔
1904
            ConfigurationSet={
1905
                "Name": name,
1906
            },
1907
        )
1908
        configuration_set_names.append(name)
1✔
1909

1910
    yield factory
1✔
1911

1912
    for configuration_set_name in configuration_set_names:
1✔
1913
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1914

1915

1916
@pytest.fixture
1✔
1917
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1918
    event_destinations = []
1✔
1919

1920
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1921
        aws_client.ses.create_configuration_set_event_destination(
1✔
1922
            ConfigurationSetName=config_set_name,
1923
            EventDestination={
1924
                "Name": event_destination_name,
1925
                "Enabled": True,
1926
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1927
                "SNSDestination": {
1928
                    "TopicARN": topic_arn,
1929
                },
1930
            },
1931
        )
1932
        event_destinations.append((config_set_name, event_destination_name))
1✔
1933

1934
    yield factory
1✔
1935

1936
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1937
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1938
            ConfigurationSetName=created_config_set_name,
1939
            EventDestinationName=created_event_destination_name,
1940
        )
1941

1942

1943
@pytest.fixture
1✔
1944
def ses_email_template(aws_client):
1✔
1945
    template_names = []
1✔
1946

1947
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1948
        aws_client.ses.create_template(
1✔
1949
            Template={
1950
                "TemplateName": name,
1951
                "SubjectPart": subject,
1952
                "TextPart": contents,
1953
            }
1954
        )
1955
        template_names.append(name)
1✔
1956

1957
    yield factory
1✔
1958

1959
    for template_name in template_names:
1✔
1960
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1961

1962

1963
@pytest.fixture
1✔
1964
def ses_verify_identity(aws_client):
1✔
1965
    identities = []
1✔
1966

1967
    def factory(email_address: str) -> None:
1✔
1968
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1969

1970
    yield factory
1✔
1971

1972
    for identity in identities:
1✔
1973
        aws_client.ses.delete_identity(Identity=identity)
×
1974

1975

1976
@pytest.fixture
1✔
1977
def ec2_create_security_group(aws_client):
1✔
1978
    ec2_sgs = []
1✔
1979

1980
    def factory(ports=None, **kwargs):
1✔
1981
        if "GroupName" not in kwargs:
1✔
1982
            kwargs["GroupName"] = f"test-sg-{short_uid()}"
1✔
1983
        security_group = aws_client.ec2.create_security_group(**kwargs)
1✔
1984

1985
        permissions = [
1✔
1986
            {
1987
                "FromPort": port,
1988
                "IpProtocol": "tcp",
1989
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
1990
                "ToPort": port,
1991
            }
1992
            for port in ports or []
1993
        ]
1994
        aws_client.ec2.authorize_security_group_ingress(
1✔
1995
            GroupName=kwargs["GroupName"],
1996
            IpPermissions=permissions,
1997
        )
1998

1999
        ec2_sgs.append(security_group["GroupId"])
1✔
2000
        return security_group
1✔
2001

2002
    yield factory
1✔
2003

2004
    for sg_group_id in ec2_sgs:
1✔
2005
        try:
1✔
2006
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2007
        except Exception as e:
×
2008
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2009

2010

2011
@pytest.fixture
1✔
2012
def cleanups():
1✔
2013
    cleanup_fns = []
1✔
2014

2015
    yield cleanup_fns
1✔
2016

2017
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2018
        try:
1✔
2019
            cleanup_callback()
1✔
2020
        except Exception as e:
1✔
2021
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2022

2023

2024
@pytest.fixture(scope="session")
1✔
2025
def account_id(aws_client):
1✔
2026
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2027
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2028
    else:
2029
        return TEST_AWS_ACCOUNT_ID
×
2030

2031

2032
@pytest.fixture(scope="session")
1✔
2033
def region_name(aws_client):
1✔
2034
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2035
        return aws_client.sts.meta.region_name
1✔
2036
    else:
2037
        return TEST_AWS_REGION_NAME
×
2038

2039

2040
@pytest.fixture(scope="session")
1✔
2041
def partition(region_name):
1✔
2042
    return get_partition(region_name)
1✔
2043

2044

2045
@pytest.fixture(scope="session")
1✔
2046
def secondary_account_id(secondary_aws_client):
1✔
2047
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2048
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2049
    else:
2050
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2051

2052

2053
@pytest.fixture(scope="session")
1✔
2054
def secondary_region_name():
1✔
2055
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2056

2057

2058
@pytest.hookimpl
1✔
2059
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2060
    only_localstack = pytest.mark.skipif(
1✔
2061
        is_aws_cloud(),
2062
        reason="test only applicable if run against localstack",
2063
    )
2064
    for item in items:
1✔
2065
        for mark in item.iter_markers():
1✔
2066
            if mark.name.endswith("only_localstack"):
1✔
2067
                item.add_marker(only_localstack)
1✔
2068
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2069
            # add a marker that indicates that this test is snapshot validated
2070
            # if it uses the snapshot fixture -> allows selecting only snapshot
2071
            # validated tests in order to capture new snapshots for a whole
2072
            # test file with "-m snapshot_validated"
2073
            item.add_marker("snapshot_validated")
1✔
2074

2075

2076
@pytest.fixture
1✔
2077
def sample_stores() -> AccountRegionBundle:
1✔
2078
    class SampleStore(BaseStore):
1✔
2079
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2080
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2081
        region_specific_attr = LocalAttribute(default=list)
1✔
2082

2083
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2084

2085

2086
@pytest.fixture
1✔
2087
def create_rest_apigw(aws_client_factory):
1✔
2088
    rest_apis = []
1✔
2089
    retry_boto_config = None
1✔
2090
    if is_aws_cloud():
1✔
2091
        retry_boto_config = botocore.config.Config(
×
2092
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2093
            retries={"max_attempts": 10, "mode": "adaptive"}
2094
        )
2095

2096
    def _create_apigateway_function(**kwargs):
1✔
2097
        client_region_name = kwargs.pop("region_name", None)
1✔
2098
        apigateway_client = aws_client_factory(
1✔
2099
            region_name=client_region_name, config=retry_boto_config
2100
        ).apigateway
2101
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2102

2103
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2104
        api_id = response.get("id")
1✔
2105
        rest_apis.append((api_id, client_region_name))
1✔
2106

2107
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2108

2109
    yield _create_apigateway_function
1✔
2110

2111
    for rest_api_id, _client_region_name in rest_apis:
1✔
2112
        apigateway_client = aws_client_factory(
1✔
2113
            region_name=_client_region_name,
2114
            config=retry_boto_config,
2115
        ).apigateway
2116
        # First, retrieve the usage plans associated with the REST API
2117
        usage_plan_ids = []
1✔
2118
        usage_plans = apigateway_client.get_usage_plans()
1✔
2119
        for item in usage_plans.get("items", []):
1✔
2120
            api_stages = item.get("apiStages", [])
1✔
2121
            usage_plan_ids.extend(
1✔
2122
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2123
            )
2124

2125
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2126
        with contextlib.suppress(Exception):
1✔
2127
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2128

2129
        # finally delete the usage plans and the API Keys linked to it
2130
        for usage_plan_id in usage_plan_ids:
1✔
2131
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2132
            for key in usage_plan_keys.get("items", []):
1✔
2133
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2134
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2135

2136

2137
@pytest.fixture
1✔
2138
def create_rest_apigw_openapi(aws_client_factory):
1✔
2139
    rest_apis = []
×
2140

2141
    def _create_apigateway_function(**kwargs):
×
2142
        region_name = kwargs.pop("region_name", None)
×
2143
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2144

2145
        response = apigateway_client.import_rest_api(**kwargs)
×
2146
        api_id = response.get("id")
×
2147
        rest_apis.append((api_id, region_name))
×
2148
        return api_id, response
×
2149

2150
    yield _create_apigateway_function
×
2151

2152
    for rest_api_id, region_name in rest_apis:
×
2153
        with contextlib.suppress(Exception):
×
2154
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2155
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2156

2157

2158
@pytest.fixture
1✔
2159
def appsync_create_api(aws_client):
1✔
2160
    graphql_apis = []
×
2161

2162
    def factory(**kwargs):
×
2163
        if "name" not in kwargs:
×
2164
            kwargs["name"] = f"graphql-api-testing-name-{short_uid()}"
×
2165
        if not kwargs.get("authenticationType"):
×
2166
            kwargs["authenticationType"] = "API_KEY"
×
2167

2168
        result = aws_client.appsync.create_graphql_api(**kwargs)["graphqlApi"]
×
2169
        graphql_apis.append(result["apiId"])
×
2170
        return result
×
2171

2172
    yield factory
×
2173

2174
    for api in graphql_apis:
×
2175
        try:
×
2176
            aws_client.appsync.delete_graphql_api(apiId=api)
×
2177
        except Exception as e:
×
2178
            LOG.debug("Error cleaning up AppSync API: %s, %s", api, e)
×
2179

2180

2181
@pytest.fixture
1✔
2182
def assert_host_customisation(monkeypatch):
1✔
2183
    localstack_host = "foo.bar"
1✔
2184
    monkeypatch.setattr(
1✔
2185
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2186
    )
2187

2188
    def asserter(
1✔
2189
        url: str,
2190
        *,
2191
        custom_host: Optional[str] = None,
2192
    ):
2193
        if custom_host is not None:
1✔
2194
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2195

2196
            assert localstack_host not in url
×
2197
        else:
2198
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2199

2200
    yield asserter
1✔
2201

2202

2203
@pytest.fixture
1✔
2204
def echo_http_server(httpserver: HTTPServer):
1✔
2205
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2206

2207
    def _echo(request: Request) -> Response:
1✔
2208
        request_json = None
1✔
2209
        if request.is_json:
1✔
2210
            with contextlib.suppress(ValueError):
1✔
2211
                request_json = json.loads(request.data)
1✔
2212
        result = {
1✔
2213
            "data": request.data or "{}",
2214
            "headers": dict(request.headers),
2215
            "url": request.url,
2216
            "method": request.method,
2217
            "json": request_json,
2218
        }
2219
        response_body = json.dumps(json_safe(result))
1✔
2220
        return Response(response_body, status=200)
1✔
2221

2222
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2223
    http_endpoint = httpserver.url_for("/")
1✔
2224

2225
    return http_endpoint
1✔
2226

2227

2228
@pytest.fixture
1✔
2229
def echo_http_server_post(echo_http_server):
1✔
2230
    """
2231
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2232
    """
2233
    if is_aws_cloud():
1✔
2234
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2235

2236
    return f"{echo_http_server}post"
1✔
2237

2238

2239
def create_policy_doc(effect: str, actions: List, resource=None) -> Dict:
1✔
2240
    actions = ensure_list(actions)
1✔
2241
    resource = resource or "*"
1✔
2242
    return {
1✔
2243
        "Version": "2012-10-17",
2244
        "Statement": [
2245
            {
2246
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2247
                "Sid": f"s{short_uid()}",
2248
                "Effect": effect,
2249
                "Action": actions,
2250
                "Resource": resource,
2251
            }
2252
        ],
2253
    }
2254

2255

2256
@pytest.fixture
1✔
2257
def create_policy_generated_document(create_policy):
1✔
2258
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2259
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2260
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2261
        response = create_policy(
1✔
2262
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2263
        )
2264
        policy_arn = response["Policy"]["Arn"]
1✔
2265
        return policy_arn
1✔
2266

2267
    return _create_policy_with_doc
1✔
2268

2269

2270
@pytest.fixture
1✔
2271
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2272
    def _create_role_with_policy(
1✔
2273
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2274
    ):
2275
        iam_client = iam_client or aws_client.iam
1✔
2276

2277
        role_name = f"role-{short_uid()}"
1✔
2278
        result = create_role(
1✔
2279
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2280
        )
2281
        role_arn = result["Role"]["Arn"]
1✔
2282
        policy_name = f"p-{short_uid()}"
1✔
2283

2284
        if attach:
1✔
2285
            # create role and attach role policy
2286
            policy_arn = create_policy_generated_document(
1✔
2287
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2288
            )
2289
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2290
        else:
2291
            # put role policy
2292
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2293
            policy_document = json.dumps(policy_document)
1✔
2294
            iam_client.put_role_policy(
1✔
2295
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2296
            )
2297

2298
        return role_name, role_arn
1✔
2299

2300
    return _create_role_with_policy
1✔
2301

2302

2303
@pytest.fixture
1✔
2304
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2305
    def _create_user_with_policy(effect, actions, resource=None):
×
2306
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2307
        username = f"user-{short_uid()}"
×
2308
        create_user(UserName=username)
×
2309
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2310
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2311
        return username, keys
×
2312

2313
    return _create_user_with_policy
×
2314

2315

2316
@pytest.fixture()
1✔
2317
def register_extension(s3_bucket, aws_client):
1✔
2318
    cfn_client = aws_client.cloudformation
×
2319
    extensions_arns = []
×
2320

2321
    def _register(extension_name, extension_type, artifact_path):
×
2322
        bucket = s3_bucket
×
2323
        key = f"artifact-{short_uid()}"
×
2324

2325
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2326

2327
        register_response = cfn_client.register_type(
×
2328
            Type=extension_type,
2329
            TypeName=extension_name,
2330
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2331
        )
2332

2333
        registration_token = register_response["RegistrationToken"]
×
2334
        cfn_client.get_waiter("type_registration_complete").wait(
×
2335
            RegistrationToken=registration_token
2336
        )
2337

2338
        describe_response = cfn_client.describe_type_registration(
×
2339
            RegistrationToken=registration_token
2340
        )
2341

2342
        extensions_arns.append(describe_response["TypeArn"])
×
2343
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2344

2345
        return describe_response
×
2346

2347
    yield _register
×
2348

2349
    for arn in extensions_arns:
×
2350
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2351
        for v in versions:
×
2352
            try:
×
2353
                cfn_client.deregister_type(Arn=v["Arn"])
×
2354
            except Exception:
×
2355
                continue
×
2356
        cfn_client.deregister_type(Arn=arn)
×
2357

2358

2359
@pytest.fixture
1✔
2360
def hosted_zone(aws_client):
1✔
2361
    zone_ids = []
1✔
2362

2363
    def factory(**kwargs):
1✔
2364
        if "CallerReference" not in kwargs:
1✔
2365
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2366
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2367
        zone_id = response["HostedZone"]["Id"]
1✔
2368
        zone_ids.append(zone_id)
1✔
2369
        return response
1✔
2370

2371
    yield factory
1✔
2372

2373
    for zone_id in zone_ids[::-1]:
1✔
2374
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2375

2376

2377
@pytest.fixture
1✔
2378
def openapi_validate(monkeypatch):
1✔
2379
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2380
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2381

2382

2383
@pytest.fixture
1✔
2384
def set_resource_custom_id():
1✔
2385
    set_ids = []
1✔
2386

2387
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2388
        localstack_id_manager.set_custom_id(
1✔
2389
            resource_identifier=resource_identifier, custom_id=custom_id
2390
        )
2391
        set_ids.append(resource_identifier)
1✔
2392

2393
    yield _set_custom_id
1✔
2394

2395
    for resource_identifier in set_ids:
1✔
2396
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2397

2398

2399
###############################
2400
# Events (EventBridge) fixtures
2401
###############################
2402

2403

2404
@pytest.fixture
1✔
2405
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2406
    event_bus_names = []
1✔
2407

2408
    def _create_event_bus(**kwargs):
1✔
2409
        if "Name" not in kwargs:
1✔
2410
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2411

2412
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2413
        event_bus_names.append(kwargs["Name"])
1✔
2414
        return response
1✔
2415

2416
    yield _create_event_bus
1✔
2417

2418
    for event_bus_name in event_bus_names:
1✔
2419
        try:
1✔
2420
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2421
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2422

2423
            # Delete all rules for the current event bus
2424
            for rule in rules:
1✔
2425
                try:
1✔
2426
                    response = aws_client.events.list_targets_by_rule(
1✔
2427
                        Rule=rule, EventBusName=event_bus_name
2428
                    )
2429
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2430

2431
                    # Remove all targets for the current rule
2432
                    if targets:
1✔
2433
                        for target in targets:
1✔
2434
                            aws_client.events.remove_targets(
1✔
2435
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2436
                            )
2437

2438
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2439
                except Exception as e:
×
2440
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2441

2442
            # Delete archives for event bus
2443
            event_source_arn = (
1✔
2444
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2445
            )
2446
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2447
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2448
            for archive in archives:
1✔
2449
                try:
1✔
2450
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2451
                except Exception as e:
×
2452
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2453

2454
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2455
        except Exception as e:
1✔
2456
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2457

2458

2459
@pytest.fixture
1✔
2460
def events_put_rule(aws_client):
1✔
2461
    rules = []
1✔
2462

2463
    def _put_rule(**kwargs):
1✔
2464
        if "Name" not in kwargs:
1✔
2465
            kwargs["Name"] = f"rule-{short_uid()}"
×
2466

2467
        response = aws_client.events.put_rule(**kwargs)
1✔
2468
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2469
        return response
1✔
2470

2471
    yield _put_rule
1✔
2472

2473
    for rule, event_bus_name in rules:
1✔
2474
        try:
1✔
2475
            response = aws_client.events.list_targets_by_rule(
1✔
2476
                Rule=rule, EventBusName=event_bus_name
2477
            )
2478
            targets = [target["Id"] for target in response["Targets"]]
1✔
2479

2480
            # Remove all targets for the current rule
2481
            if targets:
1✔
2482
                for target in targets:
1✔
2483
                    aws_client.events.remove_targets(
1✔
2484
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2485
                    )
2486

2487
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2488
        except Exception as e:
1✔
2489
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2490

2491

2492
@pytest.fixture
1✔
2493
def events_create_rule(aws_client):
1✔
2494
    rules = []
1✔
2495

2496
    def _create_rule(**kwargs):
1✔
2497
        rule_name = kwargs["Name"]
1✔
2498
        bus_name = kwargs.get("EventBusName", "")
1✔
2499
        pattern = kwargs.get("EventPattern", {})
1✔
2500
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2501
        rule_arn = aws_client.events.put_rule(
1✔
2502
            Name=rule_name,
2503
            EventBusName=bus_name,
2504
            EventPattern=json.dumps(pattern),
2505
            ScheduleExpression=schedule,
2506
        )["RuleArn"]
2507
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2508
        return rule_arn
1✔
2509

2510
    yield _create_rule
1✔
2511

2512
    for rule in rules:
1✔
2513
        targets = aws_client.events.list_targets_by_rule(
1✔
2514
            Rule=rule["name"], EventBusName=rule["bus"]
2515
        )["Targets"]
2516

2517
        targetIds = [target["Id"] for target in targets]
1✔
2518
        if len(targetIds) > 0:
1✔
2519
            aws_client.events.remove_targets(
1✔
2520
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2521
            )
2522

2523
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2524

2525

2526
@pytest.fixture
1✔
2527
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2528
    queue_urls = []
1✔
2529

2530
    def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]:
1✔
2531
        if not queue_name:
1✔
2532
            queue_name = f"tests-queue-{short_uid()}"
1✔
2533
        sqs_client = aws_client.sqs
1✔
2534
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2535
        queue_urls.append(queue_url)
1✔
2536
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2537
        policy = {
1✔
2538
            "Version": "2012-10-17",
2539
            "Id": f"sqs-eventbridge-{short_uid()}",
2540
            "Statement": [
2541
                {
2542
                    "Sid": f"SendMessage-{short_uid()}",
2543
                    "Effect": "Allow",
2544
                    "Principal": {"Service": "events.amazonaws.com"},
2545
                    "Action": "sqs:SendMessage",
2546
                    "Resource": queue_arn,
2547
                }
2548
            ],
2549
        }
2550
        sqs_client.set_queue_attributes(
1✔
2551
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2552
        )
2553
        return queue_url, queue_arn
1✔
2554

2555
    yield _sqs_as_events_target
1✔
2556

2557
    for queue_url in queue_urls:
1✔
2558
        try:
1✔
2559
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
2560
        except Exception as e:
×
2561
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2562

2563

2564
@pytest.fixture
1✔
2565
def clean_up(
1✔
2566
    aws_client,
2567
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2568
    def _clean_up(
1✔
2569
        bus_name=None,
2570
        rule_name=None,
2571
        target_ids=None,
2572
        queue_url=None,
2573
        log_group_name=None,
2574
    ):
2575
        events_client = aws_client.events
1✔
2576
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2577
        if target_ids:
1✔
2578
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2579
            call_safe(
1✔
2580
                events_client.remove_targets,
2581
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2582
            )
2583
        if rule_name:
1✔
2584
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2585
        if bus_name:
1✔
2586
            call_safe(events_client.delete_event_bus, kwargs=dict(Name=bus_name))
×
2587
        if queue_url:
1✔
2588
            sqs_client = aws_client.sqs
×
2589
            call_safe(sqs_client.delete_queue, kwargs=dict(QueueUrl=queue_url))
×
2590
        if log_group_name:
1✔
2591
            logs_client = aws_client.logs
×
2592

2593
            def _delete_log_group():
×
2594
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2595
                for log_stream in log_streams["logStreams"]:
×
2596
                    logs_client.delete_log_stream(
×
2597
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2598
                    )
2599
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2600

2601
            call_safe(_delete_log_group)
×
2602

2603
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc