• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / a53c0273-9548-479e-ab3c-f3af40c0e980

13 May 2025 05:31PM UTC coverage: 86.624% (-0.03%) from 86.658%
a53c0273-9548-479e-ab3c-f3af40c0e980

push

circleci

web-flow
ASF: Mark optional params as such (X | None) (#12614)

5 of 7 new or added lines in 2 files covered. (71.43%)

34 existing lines in 16 files now uncovered.

64347 of 74283 relevant lines covered (86.62%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.45
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
1✔
10

11
import botocore.auth
1✔
12
import botocore.config
1✔
13
import botocore.credentials
1✔
14
import botocore.session
1✔
15
import pytest
1✔
16
from _pytest.config import Config
1✔
17
from _pytest.nodes import Item
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.regions import EndpointResolver
1✔
20
from pytest_httpserver import HTTPServer
1✔
21
from werkzeug import Request, Response
1✔
22

23
from localstack import config
1✔
24
from localstack.aws.api.ec2 import CreateSecurityGroupRequest
1✔
25
from localstack.aws.connect import ServiceLevelClientFactory
1✔
26
from localstack.services.stores import (
1✔
27
    AccountRegionBundle,
28
    BaseStore,
29
    CrossAccountAttribute,
30
    CrossRegionAttribute,
31
    LocalAttribute,
32
)
33
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
34
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
35
from localstack.testing.config import (
1✔
36
    SECONDARY_TEST_AWS_ACCOUNT_ID,
37
    SECONDARY_TEST_AWS_REGION_NAME,
38
    TEST_AWS_ACCOUNT_ID,
39
    TEST_AWS_REGION_NAME,
40
)
41
from localstack.utils import testutil
1✔
42
from localstack.utils.aws.arns import get_partition
1✔
43
from localstack.utils.aws.client import SigningHttpClient
1✔
44
from localstack.utils.aws.resources import create_dynamodb_table
1✔
45
from localstack.utils.bootstrap import is_api_enabled
1✔
46
from localstack.utils.collections import ensure_list, select_from_typed_dict
1✔
47
from localstack.utils.functions import call_safe, run_safe
1✔
48
from localstack.utils.http import safe_requests as requests
1✔
49
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
50
from localstack.utils.json import CustomEncoder, json_safe
1✔
51
from localstack.utils.net import wait_for_port_open
1✔
52
from localstack.utils.strings import short_uid, to_str
1✔
53
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
54

55
LOG = logging.getLogger(__name__)
1✔
56

57
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
58
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
59

60
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
61
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
62
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
63
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
64

65

66
if TYPE_CHECKING:
1✔
67
    from mypy_boto3_sqs import SQSClient
×
68
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
69

70

71
@pytest.fixture(scope="session")
1✔
72
def aws_client_no_retry(aws_client_factory):
1✔
73
    """
74
    This fixture can be used to obtain Boto clients with disabled retries for testing.
75
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
76

77
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
78
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
79

80
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
81
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
82
    General socket/connection errors:
83
    * ConnectionError
84
    * ConnectionClosedError
85
    * ReadTimeoutError
86
    * EndpointConnectionError
87

88
    Service-side throttling/limit errors and exceptions:
89
    * Throttling
90
    * ThrottlingException
91
    * ThrottledException
92
    * RequestThrottledException
93
    * ProvisionedThroughputExceededException
94

95
    HTTP status codes: 429, 500, 502, 503, 504, and 509
96

97
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
98
    """
99
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
100
    return aws_client_factory(config=no_retry_config)
1✔
101

102

103
@pytest.fixture(scope="class")
1✔
104
def aws_http_client_factory(aws_session):
1✔
105
    """
106
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
107
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
108
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
109
    LocalStack.
110

111
    Example invocations
112

113
        client = aws_signing_http_client_factory("sqs")
114
        client.get("http://localhost:4566/000000000000/my-queue")
115

116
    or
117
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
118
        client.post("...")
119
    """
120

121
    def factory(
1✔
122
        service: str,
123
        region: str = None,
124
        signer_factory: Callable[
125
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
126
        ] = botocore.auth.SigV4QueryAuth,
127
        endpoint_url: str = None,
128
        aws_access_key_id: str = None,
129
        aws_secret_access_key: str = None,
130
    ):
131
        region = region or TEST_AWS_REGION_NAME
1✔
132

133
        if aws_access_key_id or aws_secret_access_key:
1✔
134
            credentials = botocore.credentials.Credentials(
1✔
135
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
136
            )
137
        else:
138
            credentials = aws_session.get_credentials()
1✔
139

140
        creds = credentials.get_frozen_credentials()
1✔
141

142
        if not endpoint_url:
1✔
143
            if is_aws_cloud():
1✔
144
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
145
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
146
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
147
            else:
148
                endpoint_url = config.internal_service_url()
1✔
149

150
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
151

152
    return factory
1✔
153

154

155
@pytest.fixture(scope="class")
1✔
156
def s3_vhost_client(aws_client_factory, region_name):
1✔
157
    return aws_client_factory(
1✔
158
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
159
    ).s3
160

161

162
@pytest.fixture
1✔
163
def dynamodb_wait_for_table_active(aws_client):
1✔
164
    def wait_for_table_active(table_name: str, client=None):
1✔
165
        def wait():
1✔
166
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
167
                "TableStatus"
168
            ] == "ACTIVE"
169

170
        poll_condition(wait, timeout=30)
1✔
171

172
    return wait_for_table_active
1✔
173

174

175
@pytest.fixture
1✔
176
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
177
    tables = []
1✔
178

179
    def factory(**kwargs):
1✔
180
        if "TableName" not in kwargs:
1✔
181
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
182

183
        tables.append(kwargs["TableName"])
1✔
184
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
185
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
186
        return response
1✔
187

188
    yield factory
1✔
189

190
    # cleanup
191
    for table in tables:
1✔
192
        try:
1✔
193
            # table has to be in ACTIVE state before deletion
194
            dynamodb_wait_for_table_active(table)
1✔
195
            aws_client.dynamodb.delete_table(TableName=table)
1✔
196
        except Exception as e:
1✔
197
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
198

199

200
@pytest.fixture
1✔
201
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
202
    # beware, this swallows exception in create_dynamodb_table utility function
203
    tables = []
1✔
204

205
    def factory(**kwargs):
1✔
206
        kwargs["client"] = aws_client.dynamodb
1✔
207
        if "table_name" not in kwargs:
1✔
208
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
209
        if "partition_key" not in kwargs:
1✔
210
            kwargs["partition_key"] = "id"
1✔
211

212
        tables.append(kwargs["table_name"])
1✔
213

214
        return create_dynamodb_table(**kwargs)
1✔
215

216
    yield factory
1✔
217

218
    # cleanup
219
    for table in tables:
1✔
220
        try:
1✔
221
            # table has to be in ACTIVE state before deletion
222
            dynamodb_wait_for_table_active(table)
1✔
223
            aws_client.dynamodb.delete_table(TableName=table)
1✔
224
        except Exception as e:
1✔
225
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
226

227

228
@pytest.fixture
1✔
229
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
230
    buckets = []
1✔
231

232
    def factory(**kwargs) -> str:
1✔
233
        if "Bucket" not in kwargs:
1✔
234
            kwargs["Bucket"] = "test-bucket-%s" % short_uid()
1✔
235

236
        if (
1✔
237
            "CreateBucketConfiguration" not in kwargs
238
            and aws_client.s3.meta.region_name != "us-east-1"
239
        ):
UNCOV
240
            kwargs["CreateBucketConfiguration"] = {
×
241
                "LocationConstraint": aws_client.s3.meta.region_name
242
            }
243

244
        aws_client.s3.create_bucket(**kwargs)
1✔
245
        buckets.append(kwargs["Bucket"])
1✔
246
        return kwargs["Bucket"]
1✔
247

248
    yield factory
1✔
249

250
    # cleanup
251
    for bucket in buckets:
1✔
252
        try:
1✔
253
            s3_empty_bucket(bucket)
1✔
254
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
255
        except Exception as e:
1✔
256
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
257

258

259
@pytest.fixture
1✔
260
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
261
    buckets = []
1✔
262

263
    def factory(s3_client, **kwargs) -> str:
1✔
264
        if "Bucket" not in kwargs:
1✔
265
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
266

267
        response = s3_client.create_bucket(**kwargs)
1✔
268
        buckets.append(kwargs["Bucket"])
1✔
269
        return response
1✔
270

271
    yield factory
1✔
272

273
    # cleanup
274
    for bucket in buckets:
1✔
275
        try:
1✔
276
            s3_empty_bucket(bucket)
1✔
277
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
278
        except Exception as e:
×
279
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
280

281

282
@pytest.fixture
1✔
283
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
284
    region = aws_client.s3.meta.region_name
1✔
285
    kwargs = {}
1✔
286
    if region != "us-east-1":
1✔
UNCOV
287
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
288
    return s3_create_bucket(**kwargs)
1✔
289

290

291
@pytest.fixture
1✔
292
def s3_empty_bucket(aws_client):
1✔
293
    """
294
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
295
    """
296

297
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
298
    # FIXME: this won't work when bucket has more than 1000 objects
299
    def factory(bucket_name: str):
1✔
300
        kwargs = {}
1✔
301
        try:
1✔
302
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
303
            kwargs["BypassGovernanceRetention"] = True
1✔
304
        except ClientError:
1✔
305
            pass
1✔
306

307
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
308
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
309
        if objects:
1✔
310
            aws_client.s3.delete_objects(
1✔
311
                Bucket=bucket_name,
312
                Delete={"Objects": objects},
313
                **kwargs,
314
            )
315

316
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
317
        versions = response.get("Versions", [])
1✔
318
        versions.extend(response.get("DeleteMarkers", []))
1✔
319

320
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
321
        if object_versions:
1✔
322
            aws_client.s3.delete_objects(
1✔
323
                Bucket=bucket_name,
324
                Delete={"Objects": object_versions},
325
                **kwargs,
326
            )
327

328
    yield factory
1✔
329

330

331
@pytest.fixture
1✔
332
def sqs_create_queue(aws_client):
1✔
333
    queue_urls = []
1✔
334

335
    def factory(**kwargs):
1✔
336
        if "QueueName" not in kwargs:
1✔
337
            kwargs["QueueName"] = "test-queue-%s" % short_uid()
1✔
338

339
        response = aws_client.sqs.create_queue(**kwargs)
1✔
340
        url = response["QueueUrl"]
1✔
341
        queue_urls.append(url)
1✔
342

343
        return url
1✔
344

345
    yield factory
1✔
346

347
    # cleanup
348
    for queue_url in queue_urls:
1✔
349
        try:
1✔
350
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
351
        except Exception as e:
1✔
352
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
353

354

355
@pytest.fixture
1✔
356
def sqs_receive_messages_delete(aws_client):
1✔
357
    def factory(
1✔
358
        queue_url: str,
359
        expected_messages: Optional[int] = None,
360
        wait_time: Optional[int] = 5,
361
    ):
362
        response = aws_client.sqs.receive_message(
1✔
363
            QueueUrl=queue_url,
364
            MessageAttributeNames=["All"],
365
            VisibilityTimeout=0,
366
            WaitTimeSeconds=wait_time,
367
        )
368
        messages = []
1✔
369
        for m in response["Messages"]:
1✔
370
            message = json.loads(to_str(m["Body"]))
1✔
371
            messages.append(message)
1✔
372

373
        if expected_messages is not None:
1✔
374
            assert len(messages) == expected_messages
×
375

376
        for message in response["Messages"]:
1✔
377
            aws_client.sqs.delete_message(
1✔
378
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
379
            )
380

381
        return messages
1✔
382

383
    return factory
1✔
384

385

386
@pytest.fixture
1✔
387
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
388
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
389
        all_messages = []
1✔
390
        for _ in range(max_iterations):
1✔
391
            try:
1✔
392
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
393
            except KeyError:
×
394
                # there were no messages
395
                continue
×
396
            all_messages.extend(messages)
1✔
397

398
            if len(all_messages) >= expected_messages:
1✔
399
                return all_messages[:expected_messages]
1✔
400

401
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
402

403
    return factory
1✔
404

405

406
@pytest.fixture
1✔
407
def sqs_collect_messages(aws_client):
1✔
408
    """Collects SQS messages from a given queue_url and deletes them by default.
409
    Example usage:
410
    messages = sqs_collect_messages(
411
         my_queue_url,
412
         expected=2,
413
         timeout=10,
414
         attribute_names=["All"],
415
         message_attribute_names=["All"],
416
    )
417
    """
418

419
    def factory(
1✔
420
        queue_url: str,
421
        expected: int,
422
        timeout: int,
423
        delete: bool = True,
424
        attribute_names: list[str] = None,
425
        message_attribute_names: list[str] = None,
426
        max_number_of_messages: int = 1,
427
        wait_time_seconds: int = 5,
428
        sqs_client: "SQSClient | None" = None,
429
    ) -> list["MessageTypeDef"]:
430
        sqs_client = sqs_client or aws_client.sqs
1✔
431
        collected = []
1✔
432

433
        def _receive():
1✔
434
            response = sqs_client.receive_message(
1✔
435
                QueueUrl=queue_url,
436
                # Maximum is 20 seconds. Performs long polling.
437
                WaitTimeSeconds=wait_time_seconds,
438
                # Maximum 10 messages
439
                MaxNumberOfMessages=max_number_of_messages,
440
                AttributeNames=attribute_names or [],
441
                MessageAttributeNames=message_attribute_names or [],
442
            )
443

444
            if messages := response.get("Messages"):
1✔
445
                collected.extend(messages)
1✔
446

447
                if delete:
1✔
448
                    for m in messages:
1✔
449
                        sqs_client.delete_message(
1✔
450
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
451
                        )
452

453
            return len(collected) >= expected
1✔
454

455
        if not poll_condition(_receive, timeout=timeout):
1✔
456
            raise TimeoutError(
×
457
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
458
            )
459

460
        return collected
1✔
461

462
    yield factory
1✔
463

464

465
@pytest.fixture
1✔
466
def sqs_queue(sqs_create_queue):
1✔
467
    return sqs_create_queue()
1✔
468

469

470
@pytest.fixture
1✔
471
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
472
    def _get_queue_arn(queue_url: str) -> str:
1✔
473
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
474
            "Attributes"
475
        ]["QueueArn"]
476

477
    return _get_queue_arn
1✔
478

479

480
@pytest.fixture
1✔
481
def sqs_queue_exists(aws_client):
1✔
482
    def _queue_exists(queue_url: str) -> bool:
1✔
483
        """
484
        Checks whether a queue with the given queue URL exists.
485
        :param queue_url: the queue URL
486
        :return: true if the queue exists, false otherwise
487
        """
488
        try:
1✔
489
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
490
            return result.get("QueueUrl") == queue_url
×
491
        except ClientError as e:
1✔
492
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
493
                return False
1✔
494
            raise
×
495

496
    yield _queue_exists
1✔
497

498

499
@pytest.fixture
1✔
500
def sns_create_topic(aws_client):
1✔
501
    topic_arns = []
1✔
502

503
    def _create_topic(**kwargs):
1✔
504
        if "Name" not in kwargs:
1✔
505
            kwargs["Name"] = "test-topic-%s" % short_uid()
1✔
506
        response = aws_client.sns.create_topic(**kwargs)
1✔
507
        topic_arns.append(response["TopicArn"])
1✔
508
        return response
1✔
509

510
    yield _create_topic
1✔
511

512
    for topic_arn in topic_arns:
1✔
513
        try:
1✔
514
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
515
        except Exception as e:
×
516
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
517

518

519
@pytest.fixture
1✔
520
def sns_wait_for_topic_delete(aws_client):
1✔
521
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
522
        def wait():
1✔
523
            try:
1✔
524
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
525
                return False
×
526
            except Exception as e:
1✔
527
                if "NotFound" in e.response["Error"]["Code"]:
1✔
528
                    return True
1✔
529

530
                raise
×
531

532
        poll_condition(wait, timeout=30)
1✔
533

534
    return wait_for_topic_delete
1✔
535

536

537
@pytest.fixture
1✔
538
def sns_subscription(aws_client):
1✔
539
    sub_arns = []
1✔
540

541
    def _create_sub(**kwargs):
1✔
542
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
543
            kwargs["ReturnSubscriptionArn"] = True
1✔
544

545
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
546
        response = aws_client.sns.subscribe(**kwargs)
1✔
547
        sub_arn = response["SubscriptionArn"]
1✔
548
        sub_arns.append(sub_arn)
1✔
549
        return response
1✔
550

551
    yield _create_sub
1✔
552

553
    for sub_arn in sub_arns:
1✔
554
        try:
1✔
555
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
556
        except Exception as e:
1✔
557
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
558

559

560
@pytest.fixture
1✔
561
def sns_topic(sns_create_topic, aws_client):
1✔
562
    topic_arn = sns_create_topic()["TopicArn"]
1✔
563
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
564

565

566
@pytest.fixture
1✔
567
def sns_allow_topic_sqs_queue(aws_client):
1✔
568
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
569
        # allow topic to write to sqs queue
570
        aws_client.sqs.set_queue_attributes(
1✔
571
            QueueUrl=sqs_queue_url,
572
            Attributes={
573
                "Policy": json.dumps(
574
                    {
575
                        "Statement": [
576
                            {
577
                                "Effect": "Allow",
578
                                "Principal": {"Service": "sns.amazonaws.com"},
579
                                "Action": "sqs:SendMessage",
580
                                "Resource": sqs_queue_arn,
581
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
582
                            }
583
                        ]
584
                    }
585
                )
586
            },
587
        )
588

589
    return _allow_sns_topic
1✔
590

591

592
@pytest.fixture
1✔
593
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
594
    subscriptions = []
1✔
595

596
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> Dict[str, str]:
1✔
597
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
598

599
        # connect sns topic to sqs
600
        subscription = aws_client.sns.subscribe(
1✔
601
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
602
        )
603
        subscription_arn = subscription["SubscriptionArn"]
1✔
604

605
        # allow topic to write to sqs queue
606
        sns_allow_topic_sqs_queue(
1✔
607
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
608
        )
609

610
        subscriptions.append(subscription_arn)
1✔
611
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
612
            "Attributes"
613
        ]
614

615
    yield _factory
1✔
616

617
    for arn in subscriptions:
1✔
618
        try:
1✔
619
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
620
        except Exception as e:
×
621
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
622

623

624
@pytest.fixture
1✔
625
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
626
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
627
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
628
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
629
    http_servers = []
1✔
630

631
    def _create_http_endpoint(
1✔
632
        raw_message_delivery: bool = False,
633
    ) -> Tuple[str, str, str, HTTPServer]:
634
        server = HTTPServer()
1✔
635
        server.start()
1✔
636
        http_servers.append(server)
1✔
637
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
638
        endpoint_url = server.url_for("/sns-endpoint")
1✔
639
        wait_for_port_open(endpoint_url)
1✔
640

641
        topic_arn = sns_create_topic()["TopicArn"]
1✔
642
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
643
        subscription_arn = subscription["SubscriptionArn"]
1✔
644
        delivery_policy = {
1✔
645
            "healthyRetryPolicy": {
646
                "minDelayTarget": 1,
647
                "maxDelayTarget": 1,
648
                "numRetries": 0,
649
                "numNoDelayRetries": 0,
650
                "numMinDelayRetries": 0,
651
                "numMaxDelayRetries": 0,
652
                "backoffFunction": "linear",
653
            },
654
            "sicklyRetryPolicy": None,
655
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
656
            "guaranteed": False,
657
        }
658
        aws_client.sns.set_subscription_attributes(
1✔
659
            SubscriptionArn=subscription_arn,
660
            AttributeName="DeliveryPolicy",
661
            AttributeValue=json.dumps(delivery_policy),
662
        )
663

664
        if raw_message_delivery:
1✔
665
            aws_client.sns.set_subscription_attributes(
1✔
666
                SubscriptionArn=subscription_arn,
667
                AttributeName="RawMessageDelivery",
668
                AttributeValue="true",
669
            )
670

671
        return topic_arn, subscription_arn, endpoint_url, server
1✔
672

673
    yield _create_http_endpoint
1✔
674

675
    for http_server in http_servers:
1✔
676
        if http_server.is_running():
1✔
677
            http_server.stop()
1✔
678

679

680
@pytest.fixture
1✔
681
def route53_hosted_zone(aws_client):
1✔
682
    hosted_zones = []
1✔
683

684
    def factory(**kwargs):
1✔
685
        if "Name" not in kwargs:
1✔
686
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
687
        if "CallerReference" not in kwargs:
1✔
688
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
689
        response = aws_client.route53.create_hosted_zone(
1✔
690
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
691
        )
692
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
693
        return response
1✔
694

695
    yield factory
1✔
696

697
    for zone in hosted_zones:
1✔
698
        try:
1✔
699
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
700
        except Exception as e:
1✔
701
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
702

703

704
@pytest.fixture
1✔
705
def transcribe_create_job(s3_bucket, aws_client):
1✔
706
    job_names = []
1✔
707

708
    def _create_job(audio_file: str, params: Optional[dict[str, Any]] = None) -> str:
1✔
709
        s3_key = "test-clip.wav"
1✔
710

711
        if not params:
1✔
712
            params = {}
1✔
713

714
        if "TranscriptionJobName" not in params:
1✔
715
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
716

717
        if "LanguageCode" not in params:
1✔
718
            params["LanguageCode"] = "en-GB"
1✔
719

720
        if "Media" not in params:
1✔
721
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
722

723
        # upload test wav to a s3 bucket
724
        with open(audio_file, "rb") as f:
1✔
725
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
726

727
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
728

729
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
730
        job_names.append(job_name)
1✔
731

732
        return job_name
1✔
733

734
    yield _create_job
1✔
735

736
    for job_name in job_names:
1✔
737
        with contextlib.suppress(ClientError):
1✔
738
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
739

740

741
@pytest.fixture
1✔
742
def kinesis_create_stream(aws_client):
1✔
743
    stream_names = []
1✔
744

745
    def _create_stream(**kwargs):
1✔
746
        if "StreamName" not in kwargs:
1✔
747
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
748
        aws_client.kinesis.create_stream(**kwargs)
1✔
749
        stream_names.append(kwargs["StreamName"])
1✔
750
        return kwargs["StreamName"]
1✔
751

752
    yield _create_stream
1✔
753

754
    for stream_name in stream_names:
1✔
755
        try:
1✔
756
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
757
        except Exception as e:
×
758
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
×
759

760

761
@pytest.fixture
1✔
762
def wait_for_stream_ready(aws_client):
1✔
763
    def _wait_for_stream_ready(stream_name: str):
1✔
764
        def is_stream_ready():
1✔
765
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
766
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
767
                "ACTIVE",
768
                "UPDATING",
769
            ]
770

771
        return poll_condition(is_stream_ready)
1✔
772

773
    return _wait_for_stream_ready
1✔
774

775

776
@pytest.fixture
1✔
777
def wait_for_delivery_stream_ready(aws_client):
1✔
778
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
779
        def is_stream_ready():
1✔
780
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
781
                DeliveryStreamName=delivery_stream_name
782
            )
783
            return (
1✔
784
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
785
                == "ACTIVE"
786
            )
787

788
        poll_condition(is_stream_ready)
1✔
789

790
    return _wait_for_stream_ready
1✔
791

792

793
@pytest.fixture
1✔
794
def wait_for_dynamodb_stream_ready(aws_client):
1✔
795
    def _wait_for_stream_ready(stream_arn: str):
1✔
796
        def is_stream_ready():
1✔
797
            describe_stream_response = aws_client.dynamodbstreams.describe_stream(
1✔
798
                StreamArn=stream_arn
799
            )
800
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
801

802
        return poll_condition(is_stream_ready)
1✔
803

804
    return _wait_for_stream_ready
1✔
805

806

807
@pytest.fixture()
1✔
808
def kms_create_key(aws_client_factory):
1✔
809
    key_ids = []
1✔
810

811
    def _create_key(region_name: str = None, **kwargs):
1✔
812
        if "Description" not in kwargs:
1✔
813
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
814
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
815
            "KeyMetadata"
816
        ]
817
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
818
        return key_metadata
1✔
819

820
    yield _create_key
1✔
821

822
    for region_name, key_id in key_ids:
1✔
823
        try:
1✔
824
            # shortest amount of time you can schedule the deletion
825
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
826
                KeyId=key_id, PendingWindowInDays=7
827
            )
828
        except Exception as e:
1✔
829
            exception_message = str(e)
1✔
830
            # Some tests schedule their keys for deletion themselves.
831
            if (
1✔
832
                "KMSInvalidStateException" not in exception_message
833
                or "is pending deletion" not in exception_message
834
            ):
835
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
836

837

838
@pytest.fixture()
1✔
839
def kms_replicate_key(aws_client_factory):
1✔
840
    key_ids = []
1✔
841

842
    def _replicate_key(region_from=None, **kwargs):
1✔
843
        region_to = kwargs.get("ReplicaRegion")
1✔
844
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
845
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
846

847
    yield _replicate_key
1✔
848

849
    for region_to, key_id in key_ids:
1✔
850
        try:
1✔
851
            # shortest amount of time you can schedule the deletion
852
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
853
                KeyId=key_id, PendingWindowInDays=7
854
            )
855
        except Exception as e:
×
856
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
857

858

859
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
860
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
861
# to make sure that we clean up aliases before keys get cleaned up.
862
@pytest.fixture()
1✔
863
def kms_create_alias(kms_create_key, aws_client):
1✔
864
    aliases = []
1✔
865

866
    def _create_alias(**kwargs):
1✔
867
        if "AliasName" not in kwargs:
1✔
868
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
869
        if "TargetKeyId" not in kwargs:
1✔
870
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
871

872
        aws_client.kms.create_alias(**kwargs)
1✔
873
        aliases.append(kwargs["AliasName"])
1✔
874
        return kwargs["AliasName"]
1✔
875

876
    yield _create_alias
1✔
877

878
    for alias in aliases:
1✔
879
        try:
1✔
880
            aws_client.kms.delete_alias(AliasName=alias)
1✔
881
        except Exception as e:
1✔
882
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
883

884

885
@pytest.fixture()
1✔
886
def kms_create_grant(kms_create_key, aws_client):
1✔
887
    grants = []
1✔
888

889
    def _create_grant(**kwargs):
1✔
890
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
891
        # but some GranteePrincipal is required to create a grant.
892
        GRANTEE_PRINCIPAL_ARN = (
1✔
893
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
894
        )
895

896
        if "Operations" not in kwargs:
1✔
897
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
898
        if "GranteePrincipal" not in kwargs:
1✔
899
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
900
        if "KeyId" not in kwargs:
1✔
901
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
902

903
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
904
        grants.append((grant_id, kwargs["KeyId"]))
1✔
905
        return grant_id, kwargs["KeyId"]
1✔
906

907
    yield _create_grant
1✔
908

909
    for grant_id, key_id in grants:
1✔
910
        try:
1✔
911
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
912
        except Exception as e:
×
913
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
914

915

916
@pytest.fixture
1✔
917
def kms_key(kms_create_key):
1✔
918
    return kms_create_key()
1✔
919

920

921
@pytest.fixture
1✔
922
def kms_grant_and_key(kms_key, aws_client):
1✔
923
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
924

925
    return [
1✔
926
        aws_client.kms.create_grant(
927
            KeyId=kms_key["KeyId"],
928
            GranteePrincipal=user_arn,
929
            Operations=["Decrypt", "Encrypt"],
930
        ),
931
        kms_key,
932
    ]
933

934

935
@pytest.fixture
1✔
936
def opensearch_wait_for_cluster(aws_client):
1✔
937
    def _wait_for_cluster(domain_name: str):
1✔
938
        def finished_processing():
1✔
939
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
940
            return status["Processing"] is False and "Endpoint" in status
1✔
941

942
        assert poll_condition(
1✔
943
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
944
        ), f"could not start domain: {domain_name}"
945

946
    return _wait_for_cluster
1✔
947

948

949
@pytest.fixture
1✔
950
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
951
    domains = []
1✔
952

953
    def factory(**kwargs) -> str:
1✔
954
        if "DomainName" not in kwargs:
1✔
955
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
956

957
        aws_client.opensearch.create_domain(**kwargs)
1✔
958

959
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
960

961
        domains.append(kwargs["DomainName"])
1✔
962
        return kwargs["DomainName"]
1✔
963

964
    yield factory
1✔
965

966
    # cleanup
967
    for domain in domains:
1✔
968
        try:
1✔
969
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
970
        except Exception as e:
×
971
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
972

973

974
@pytest.fixture
1✔
975
def opensearch_domain(opensearch_create_domain) -> str:
1✔
976
    return opensearch_create_domain()
1✔
977

978

979
@pytest.fixture
1✔
980
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
981
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
982
    assert "Endpoint" in status
1✔
983
    return f"https://{status['Endpoint']}"
1✔
984

985

986
@pytest.fixture
1✔
987
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
988
    document = {
1✔
989
        "first_name": "Boba",
990
        "last_name": "Fett",
991
        "age": 41,
992
        "about": "I'm just a simple man, trying to make my way in the universe.",
993
        "interests": ["mandalorian armor", "tusken culture"],
994
    }
995
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
996
    response = requests.put(
1✔
997
        document_path,
998
        data=json.dumps(document),
999
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
1000
    )
1001
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1002
    return document_path
1✔
1003

1004

1005
# Cleanup fixtures
1006
@pytest.fixture
1✔
1007
def cleanup_stacks(aws_client):
1✔
1008
    def _cleanup_stacks(stacks: List[str]) -> None:
1✔
1009
        stacks = ensure_list(stacks)
1✔
1010
        for stack in stacks:
1✔
1011
            try:
1✔
1012
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1013
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1014
            except Exception:
×
1015
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1016

1017
    return _cleanup_stacks
1✔
1018

1019

1020
@pytest.fixture
1✔
1021
def cleanup_changesets(aws_client):
1✔
1022
    def _cleanup_changesets(changesets: List[str]) -> None:
1✔
1023
        changesets = ensure_list(changesets)
1✔
1024
        for cs in changesets:
1✔
1025
            try:
1✔
1026
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1027
            except Exception:
1✔
1028
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1029

1030
    return _cleanup_changesets
1✔
1031

1032

1033
# Helpers for Cfn
1034

1035

1036
# TODO: exports(!)
1037
@dataclasses.dataclass(frozen=True)
1✔
1038
class DeployResult:
1✔
1039
    change_set_id: str
1✔
1040
    stack_id: str
1✔
1041
    stack_name: str
1✔
1042
    change_set_name: str
1✔
1043
    outputs: Dict[str, str]
1✔
1044

1045
    destroy: Callable[[], None]
1✔
1046

1047

1048
class StackDeployError(Exception):
1✔
1049
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1050
        self.describe_result = describe_res
1✔
1051
        self.events = events
1✔
1052

1053
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1054
        if config.CFN_VERBOSE_ERRORS:
1✔
1055
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1056
        else:
1057
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1058

1059
        super().__init__(msg)
1✔
1060

1061
    def format_events(self, events: list[dict]) -> str:
1✔
1062
        formatted_events = []
1✔
1063

1064
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1065
        for event in chronological_events:
1✔
1066
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1067
                formatted_events.append(self.format_event(event))
1✔
1068

1069
        return "\n".join(formatted_events)
1✔
1070

1071
    @staticmethod
1✔
1072
    def format_event(event: dict) -> str:
1✔
1073
        if reason := event.get("ResourceStatusReason"):
1✔
1074
            reason = reason.replace("\n", "; ")
1✔
1075
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1076
        else:
1077
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
×
1078

1079

1080
@pytest.fixture
1✔
1081
def deploy_cfn_template(
1✔
1082
    aws_client: ServiceLevelClientFactory,
1083
):
1084
    state: list[tuple[str, Callable]] = []
1✔
1085

1086
    def _deploy(
1✔
1087
        *,
1088
        is_update: Optional[bool] = False,
1089
        stack_name: Optional[str] = None,
1090
        change_set_name: Optional[str] = None,
1091
        template: Optional[str] = None,
1092
        template_path: Optional[str | os.PathLike] = None,
1093
        template_mapping: Optional[Dict[str, Any]] = None,
1094
        parameters: Optional[Dict[str, str]] = None,
1095
        role_arn: Optional[str] = None,
1096
        max_wait: Optional[int] = None,
1097
        delay_between_polls: Optional[int] = 2,
1098
        custom_aws_client: Optional[ServiceLevelClientFactory] = None,
1099
    ) -> DeployResult:
1100
        if is_update:
1✔
1101
            assert stack_name
1✔
1102
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1103
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1104

1105
        if max_wait is None:
1✔
1106
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1107

1108
        if template_path is not None:
1✔
1109
            template = load_template_file(template_path)
1✔
1110
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1111

1112
        kwargs = dict(
1✔
1113
            StackName=stack_name,
1114
            ChangeSetName=change_set_name,
1115
            TemplateBody=template_rendered,
1116
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1117
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1118
            Parameters=[
1119
                {
1120
                    "ParameterKey": k,
1121
                    "ParameterValue": v,
1122
                }
1123
                for (k, v) in (parameters or {}).items()
1124
            ],
1125
        )
1126
        if role_arn is not None:
1✔
1127
            kwargs["RoleARN"] = role_arn
×
1128

1129
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1130

1131
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1132

1133
        change_set_id = response["Id"]
1✔
1134
        stack_id = response["StackId"]
1✔
1135

1136
        cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1137
            ChangeSetName=change_set_id
1138
        )
1139
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1140
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1141
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1142
        )
1143

1144
        try:
1✔
1145
            stack_waiter.wait(
1✔
1146
                StackName=stack_id,
1147
                WaiterConfig={
1148
                    "Delay": delay_between_polls,
1149
                    "MaxAttempts": max_wait / delay_between_polls,
1150
                },
1151
            )
1152
        except botocore.exceptions.WaiterError as e:
1✔
1153
            raise StackDeployError(
1✔
1154
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1155
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1156
                    "StackEvents"
1157
                ],
1158
            ) from e
1159

1160
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1161
            "Stacks"
1162
        ][0]
1163
        outputs = describe_stack_res.get("Outputs", [])
1✔
1164

1165
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1166

1167
        def _destroy_stack():
1✔
1168
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1169
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1170
                StackName=stack_id,
1171
                WaiterConfig={
1172
                    "Delay": delay_between_polls,
1173
                    "MaxAttempts": max_wait / delay_between_polls,
1174
                },
1175
            )
1176

1177
        state.append((stack_id, _destroy_stack))
1✔
1178

1179
        return DeployResult(
1✔
1180
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1181
        )
1182

1183
    yield _deploy
1✔
1184

1185
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1186
    for stack_id, teardown in state[::-1]:
1✔
1187
        try:
1✔
1188
            teardown()
1✔
1189
        except Exception as e:
1✔
1190
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1191

1192

1193
@pytest.fixture
1✔
1194
def is_change_set_created_and_available(aws_client):
1✔
1195
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1196
        def _inner():
1✔
1197
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1198
            return (
1✔
1199
                # TODO: CREATE_FAILED should also not lead to further retries
1200
                change_set.get("Status") == "CREATE_COMPLETE"
1201
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1202
            )
1203

1204
        return _inner
1✔
1205

1206
    return _is_change_set_created_and_available
1✔
1207

1208

1209
@pytest.fixture
1✔
1210
def is_change_set_failed_and_unavailable(aws_client):
1✔
1211
    def _is_change_set_created_and_available(change_set_id: str):
×
1212
        def _inner():
×
1213
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1214
            return (
×
1215
                # TODO: CREATE_FAILED should also not lead to further retries
1216
                change_set.get("Status") == "FAILED"
1217
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1218
            )
1219

1220
        return _inner
×
1221

1222
    return _is_change_set_created_and_available
×
1223

1224

1225
@pytest.fixture
1✔
1226
def is_stack_created(aws_client):
1✔
1227
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1228

1229

1230
@pytest.fixture
1✔
1231
def is_stack_updated(aws_client):
1✔
1232
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1233

1234

1235
@pytest.fixture
1✔
1236
def is_stack_deleted(aws_client):
1✔
1237
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1238

1239

1240
def _has_stack_status(cfn_client, statuses: List[str]):
1✔
1241
    def _has_status(stack_id: str):
1✔
1242
        def _inner():
1✔
1243
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1244
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1245
            return s.get("StackStatus") in statuses
1✔
1246

1247
        return _inner
1✔
1248

1249
    return _has_status
1✔
1250

1251

1252
@pytest.fixture
1✔
1253
def is_change_set_finished(aws_client):
1✔
1254
    def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):
1✔
1255
        def _inner():
1✔
1256
            kwargs = {"ChangeSetName": change_set_id}
1✔
1257
            if stack_name:
1✔
1258
                kwargs["StackName"] = stack_name
×
1259

1260
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
1✔
1261

1262
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
1✔
1263
                LOG.warning("Change set failed")
×
1264
                raise ShortCircuitWaitException()
×
1265

1266
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
1✔
1267

1268
        return _inner
1✔
1269

1270
    return _is_change_set_finished
1✔
1271

1272

1273
@pytest.fixture
1✔
1274
def wait_until_lambda_ready(aws_client):
1✔
1275
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1276
        client = client or aws_client.lambda_
1✔
1277

1278
        def _is_not_pending():
1✔
1279
            kwargs = {}
1✔
1280
            if qualifier:
1✔
1281
                kwargs["Qualifier"] = qualifier
×
1282
            try:
1✔
1283
                result = (
1✔
1284
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1285
                    != "Pending"
1286
                )
1287
                LOG.debug("lambda state result: result=%s", result)
1✔
1288
                return result
1✔
1289
            except Exception as e:
×
1290
                LOG.error(e)
×
1291
                raise
×
1292

1293
        wait_until(_is_not_pending)
1✔
1294

1295
    return _wait_until_ready
1✔
1296

1297

1298
role_assume_policy = """
1✔
1299
{
1300
  "Version": "2012-10-17",
1301
  "Statement": [
1302
    {
1303
      "Effect": "Allow",
1304
      "Principal": {
1305
        "Service": "lambda.amazonaws.com"
1306
      },
1307
      "Action": "sts:AssumeRole"
1308
    }
1309
  ]
1310
}
1311
""".strip()
1312

1313
role_policy = """
1✔
1314
{
1315
    "Version": "2012-10-17",
1316
    "Statement": [
1317
        {
1318
            "Effect": "Allow",
1319
            "Action": [
1320
                "logs:CreateLogGroup",
1321
                "logs:CreateLogStream",
1322
                "logs:PutLogEvents"
1323
            ],
1324
            "Resource": [
1325
                "*"
1326
            ]
1327
        }
1328
    ]
1329
}
1330
""".strip()
1331

1332

1333
@pytest.fixture
1✔
1334
def create_lambda_function_aws(aws_client):
1✔
1335
    lambda_arns = []
1✔
1336

1337
    def _create_lambda_function(**kwargs):
1✔
1338
        def _create_function():
1✔
1339
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1340
            lambda_arns.append(resp["FunctionArn"])
1✔
1341

1342
            def _is_not_pending():
1✔
1343
                try:
1✔
1344
                    result = (
1✔
1345
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1346
                            "Configuration"
1347
                        ]["State"]
1348
                        != "Pending"
1349
                    )
1350
                    return result
1✔
1351
                except Exception as e:
×
1352
                    LOG.error(e)
×
1353
                    raise
×
1354

1355
            wait_until(_is_not_pending)
1✔
1356
            return resp
1✔
1357

1358
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1359
        # localstack should normally not require the retries and will just continue here
1360
        return retry(_create_function, retries=3, sleep=4)
1✔
1361

1362
    yield _create_lambda_function
1✔
1363

1364
    for arn in lambda_arns:
1✔
1365
        try:
1✔
1366
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1367
        except Exception:
×
1368
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1369

1370

1371
@pytest.fixture
1✔
1372
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1373
    lambda_arns_and_clients = []
1✔
1374
    log_groups = []
1✔
1375
    lambda_client = aws_client.lambda_
1✔
1376
    logs_client = aws_client.logs
1✔
1377
    s3_client = aws_client.s3
1✔
1378

1379
    def _create_lambda_function(*args, **kwargs):
1✔
1380
        client = kwargs.get("client") or lambda_client
1✔
1381
        kwargs["client"] = client
1✔
1382
        kwargs["s3_client"] = s3_client
1✔
1383
        func_name = kwargs.get("func_name")
1✔
1384
        assert func_name
1✔
1385
        del kwargs["func_name"]
1✔
1386

1387
        if not kwargs.get("role"):
1✔
1388
            kwargs["role"] = lambda_su_role
1✔
1389

1390
        def _create_function():
1✔
1391
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1392
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1393
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1394
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1395
            log_groups.append(log_group_name)
1✔
1396
            return resp
1✔
1397

1398
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1399
        # localstack should normally not require the retries and will just continue here
1400
        return retry(_create_function, retries=3, sleep=4)
1✔
1401

1402
    yield _create_lambda_function
1✔
1403

1404
    for arn, client in lambda_arns_and_clients:
1✔
1405
        try:
1✔
1406
            client.delete_function(FunctionName=arn)
1✔
1407
        except Exception:
1✔
1408
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1409

1410
    for log_group_name in log_groups:
1✔
1411
        try:
1✔
1412
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1413
        except Exception:
1✔
1414
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1415

1416

1417
@pytest.fixture
1✔
1418
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1419
    from localstack.aws.api.lambda_ import Runtime
1✔
1420

1421
    lambda_client = aws_client.lambda_
1✔
1422
    handler_code = textwrap.dedent(
1✔
1423
        """
1424
    import json
1425
    import os
1426

1427

1428
    def make_response(body: dict, status_code: int = 200):
1429
        return {
1430
            "statusCode": status_code,
1431
            "headers": {"Content-Type": "application/json"},
1432
            "body": body,
1433
        }
1434

1435

1436
    def trim_headers(headers):
1437
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1438
            return headers
1439
        return {
1440
            key: value for key, value in headers.items()
1441
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1442
        }
1443

1444

1445
    def handler(event, context):
1446
        print(json.dumps(event))
1447
        response = {
1448
            "args": event.get("queryStringParameters", {}),
1449
            "data": event.get("body", ""),
1450
            "domain": event["requestContext"].get("domainName", ""),
1451
            "headers": trim_headers(event.get("headers", {})),
1452
            "method": event["requestContext"]["http"].get("method", ""),
1453
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1454
            "path": event["requestContext"]["http"].get("path", ""),
1455
        }
1456
        return make_response(response)"""
1457
    )
1458

1459
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1460
        """Creates a server that will echo any request. Any request will be returned with the
1461
        following format. Any unset values will have those defaults.
1462
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1463
        order to create easier Snapshot testing. Default: `False`
1464
        {
1465
          "args": {},
1466
          "headers": {},
1467
          "data": "",
1468
          "method": "",
1469
          "domain": "",
1470
          "origin": "",
1471
          "path": ""
1472
        }"""
1473
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1474
        func_name = f"echo-http-{short_uid()}"
1✔
1475
        create_lambda_function(
1✔
1476
            func_name=func_name,
1477
            zip_file=zip_file,
1478
            runtime=Runtime.python3_12,
1479
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1480
        )
1481
        url_response = lambda_client.create_function_url_config(
1✔
1482
            FunctionName=func_name, AuthType="NONE"
1483
        )
1484
        aws_client.lambda_.add_permission(
1✔
1485
            FunctionName=func_name,
1486
            StatementId="urlPermission",
1487
            Action="lambda:InvokeFunctionUrl",
1488
            Principal="*",
1489
            FunctionUrlAuthType="NONE",
1490
        )
1491
        return url_response["FunctionUrl"]
1✔
1492

1493
    yield _create_echo_http_server
1✔
1494

1495

1496
@pytest.fixture
1✔
1497
def create_event_source_mapping(aws_client):
1✔
1498
    uuids = []
1✔
1499

1500
    def _create_event_source_mapping(*args, **kwargs):
1✔
1501
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1502
        uuids.append(response["UUID"])
1✔
1503
        return response
1✔
1504

1505
    yield _create_event_source_mapping
1✔
1506

1507
    for uuid in uuids:
1✔
1508
        try:
1✔
1509
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1510
        except Exception:
×
1511
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1512

1513

1514
@pytest.fixture
1✔
1515
def check_lambda_logs(aws_client):
1✔
1516
    def _check_logs(func_name: str, expected_lines: List[str] = None) -> List[str]:
1✔
1517
        if not expected_lines:
1✔
1518
            expected_lines = []
×
1519
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1520
        log_messages = [e["message"] for e in log_events]
1✔
1521
        for line in expected_lines:
1✔
1522
            if ".*" in line:
1✔
1523
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1524
                if any(found):
1✔
1525
                    continue
1✔
1526
            assert line in log_messages
×
1527
        return log_messages
1✔
1528

1529
    return _check_logs
1✔
1530

1531

1532
@pytest.fixture
1✔
1533
def create_policy(aws_client):
1✔
1534
    policy_arns = []
1✔
1535

1536
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1537
        iam_client = iam_client or aws_client.iam
1✔
1538
        if "PolicyName" not in kwargs:
1✔
1539
            kwargs["PolicyName"] = f"policy-{short_uid()}"
1✔
1540
        response = iam_client.create_policy(*args, **kwargs)
1✔
1541
        policy_arn = response["Policy"]["Arn"]
1✔
1542
        policy_arns.append((policy_arn, iam_client))
1✔
1543
        return response
1✔
1544

1545
    yield _create_policy
1✔
1546

1547
    for policy_arn, iam_client in policy_arns:
1✔
1548
        try:
1✔
1549
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1550
        except Exception:
1✔
1551
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1552

1553

1554
@pytest.fixture
1✔
1555
def create_user(aws_client):
1✔
1556
    usernames = []
1✔
1557

1558
    def _create_user(**kwargs):
1✔
1559
        if "UserName" not in kwargs:
1✔
1560
            kwargs["UserName"] = f"user-{short_uid()}"
×
1561
        response = aws_client.iam.create_user(**kwargs)
1✔
1562
        usernames.append(response["User"]["UserName"])
1✔
1563
        return response
1✔
1564

1565
    yield _create_user
1✔
1566

1567
    for username in usernames:
1✔
1568
        try:
1✔
1569
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1570
        except ClientError as e:
1✔
1571
            LOG.debug(
1✔
1572
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1573
            )
1574
            continue
1✔
1575

1576
        for inline_policy in inline_policies:
1✔
1577
            try:
1✔
1578
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1579
            except Exception:
×
1580
                LOG.debug(
×
1581
                    "Could not delete user policy '%s' from '%s' during cleanup",
1582
                    inline_policy,
1583
                    username,
1584
                )
1585
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1586
            "AttachedPolicies"
1587
        ]
1588
        for attached_policy in attached_policies:
1✔
1589
            try:
1✔
1590
                aws_client.iam.detach_user_policy(
1✔
1591
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1592
                )
1593
            except Exception:
1✔
1594
                LOG.debug(
1✔
1595
                    "Error detaching policy '%s' from user '%s'",
1596
                    attached_policy["PolicyArn"],
1597
                    username,
1598
                )
1599
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1600
        for access_key in access_keys:
1✔
1601
            try:
1✔
1602
                aws_client.iam.delete_access_key(
1✔
1603
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1604
                )
1605
            except Exception:
×
1606
                LOG.debug(
×
1607
                    "Error deleting access key '%s' from user '%s'",
1608
                    access_key["AccessKeyId"],
1609
                    username,
1610
                )
1611

1612
        try:
1✔
1613
            aws_client.iam.delete_user(UserName=username)
1✔
1614
        except Exception as e:
1✔
1615
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1616

1617

1618
@pytest.fixture
1✔
1619
def wait_and_assume_role(aws_client):
1✔
1620
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1621
        if not session_name:
1✔
1622
            session_name = f"session-{short_uid()}"
1✔
1623

1624
        def assume_role():
1✔
1625
            return aws_client.sts.assume_role(
1✔
1626
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1627
            )["Credentials"]
1628

1629
        # need to retry a couple of times before we are allowed to assume this role in AWS
1630
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1631
        return keys
1✔
1632

1633
    return _wait_and_assume_role
1✔
1634

1635

1636
@pytest.fixture
1✔
1637
def create_role(aws_client):
1✔
1638
    role_names = []
1✔
1639

1640
    def _create_role(iam_client=None, **kwargs):
1✔
1641
        if not kwargs.get("RoleName"):
1✔
1642
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1643
        iam_client = iam_client or aws_client.iam
1✔
1644
        result = iam_client.create_role(**kwargs)
1✔
1645
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1646
        return result
1✔
1647

1648
    yield _create_role
1✔
1649

1650
    for role_name, iam_client in role_names:
1✔
1651
        # detach policies
1652
        try:
1✔
1653
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1654
                "AttachedPolicies"
1655
            ]
1656
        except ClientError as e:
1✔
1657
            LOG.debug(
1✔
1658
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1659
                e,
1660
                role_name,
1661
            )
1662
            continue
1✔
1663
        for attached_policy in attached_policies:
1✔
1664
            try:
1✔
1665
                iam_client.detach_role_policy(
1✔
1666
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1667
                )
1668
            except Exception:
×
1669
                LOG.debug(
×
1670
                    "Could not detach role policy '%s' from '%s' during cleanup",
1671
                    attached_policy["PolicyArn"],
1672
                    role_name,
1673
                )
1674
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1675
        for role_policy in role_policies:
1✔
1676
            try:
1✔
1677
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1678
            except Exception:
×
1679
                LOG.debug(
×
1680
                    "Could not delete role policy '%s' from '%s' during cleanup",
1681
                    role_policy,
1682
                    role_name,
1683
                )
1684
        try:
1✔
1685
            iam_client.delete_role(RoleName=role_name)
1✔
1686
        except Exception:
×
1687
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1688

1689

1690
@pytest.fixture
1✔
1691
def create_parameter(aws_client):
1✔
1692
    params = []
1✔
1693

1694
    def _create_parameter(**kwargs):
1✔
1695
        params.append(kwargs["Name"])
1✔
1696
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1697

1698
    yield _create_parameter
1✔
1699

1700
    for param in params:
1✔
1701
        aws_client.ssm.delete_parameter(Name=param)
1✔
1702

1703

1704
@pytest.fixture
1✔
1705
def create_secret(aws_client):
1✔
1706
    items = []
1✔
1707

1708
    def _create_parameter(**kwargs):
1✔
1709
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1710
        items.append(create_response["ARN"])
1✔
1711
        return create_response
1✔
1712

1713
    yield _create_parameter
1✔
1714

1715
    for item in items:
1✔
1716
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1717

1718

1719
# TODO Figure out how to make cert creation tests pass against AWS.
1720
#
1721
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1722
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1723
#
1724
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1725
# by adding some CNAME records as requested by ASW in response to a certificate request.
1726
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1727
#
1728
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1729
# created by openssl etc. Not sure if there are other issues after that.
1730
#
1731
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1732
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1733
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1734
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1735
# pre-created certificates we would have to use specific domain names instead of random ones.
1736
@pytest.fixture
1✔
1737
def acm_request_certificate(aws_client_factory):
1✔
1738
    certificate_arns = []
1✔
1739

1740
    def factory(**kwargs) -> str:
1✔
1741
        if "DomainName" not in kwargs:
1✔
1742
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1743

1744
        region_name = kwargs.pop("region_name", None)
1✔
1745
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1746

1747
        response = acm_client.request_certificate(**kwargs)
1✔
1748
        created_certificate_arn = response["CertificateArn"]
1✔
1749
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1750
        return response
1✔
1751

1752
    yield factory
1✔
1753

1754
    # cleanup
1755
    for certificate_arn, region_name in certificate_arns:
1✔
1756
        try:
1✔
1757
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1758
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1759
        except Exception as e:
×
1760
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1761

1762

1763
role_policy_su = {
1✔
1764
    "Version": "2012-10-17",
1765
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1766
}
1767

1768

1769
@pytest.fixture(scope="session")
1✔
1770
def lambda_su_role(aws_client):
1✔
1771
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1772
    role = aws_client.iam.create_role(
1✔
1773
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1774
    )["Role"]
1775
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1776
    policy_arn = aws_client.iam.create_policy(
1✔
1777
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1778
    )["Policy"]["Arn"]
1779
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1780

1781
    if is_aws_cloud():  # dirty but necessary
1✔
1782
        time.sleep(10)
×
1783

1784
    yield role["Arn"]
1✔
1785

1786
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1787
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1788
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1789

1790

1791
@pytest.fixture
1✔
1792
def create_iam_role_and_attach_policy(aws_client):
1✔
1793
    """
1794
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1795

1796
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1797
    """
1798
    roles = []
×
1799

1800
    def _inner(**kwargs: dict[str, any]) -> str:
×
1801
        """
1802
        :param dict RoleDefinition: role definition document
1803
        :param str PolicyArn: policy ARN
1804
        :param str RoleName: role name (autogenerated if omitted)
1805
        :return: role ARN
1806
        """
1807
        if "RoleName" not in kwargs:
×
1808
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1809

1810
        role = kwargs["RoleName"]
×
1811
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1812

1813
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1814
        role_arn = result["Role"]["Arn"]
×
1815

1816
        policy_arn = kwargs["PolicyArn"]
×
1817
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1818

1819
        roles.append(role)
×
1820
        return role_arn
×
1821

1822
    yield _inner
×
1823

1824
    for role in roles:
×
1825
        try:
×
1826
            aws_client.iam.delete_role(RoleName=role)
×
1827
        except Exception as exc:
×
1828
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1829

1830

1831
@pytest.fixture
1✔
1832
def create_iam_role_with_policy(aws_client):
1✔
1833
    """
1834
    Fixture that creates an IAM role with given role definition and policy definition.
1835
    """
1836
    roles = {}
1✔
1837

1838
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1839
        """
1840
        :param dict RoleDefinition: role definition document
1841
        :param dict PolicyDefinition: policy definition document
1842
        :param str PolicyName: policy name (autogenerated if omitted)
1843
        :param str RoleName: role name (autogenerated if omitted)
1844
        :return: role ARN
1845
        """
1846
        if "RoleName" not in kwargs:
1✔
1847
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1848
        role = kwargs["RoleName"]
1✔
1849
        if "PolicyName" not in kwargs:
1✔
1850
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1851
        policy = kwargs["PolicyName"]
1✔
1852
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1853

1854
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1855
        role_arn = result["Role"]["Arn"]
1✔
1856

1857
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1858
        aws_client.iam.put_role_policy(
1✔
1859
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1860
        )
1861
        roles[role] = policy
1✔
1862
        return role_arn
1✔
1863

1864
    yield _create_role_and_policy
1✔
1865

1866
    for role_name, policy_name in roles.items():
1✔
1867
        try:
1✔
1868
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1869
        except Exception as exc:
×
1870
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1871
        try:
1✔
1872
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1873
        except Exception as exc:
×
1874
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1875

1876

1877
@pytest.fixture
1✔
1878
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1879
    delivery_stream_names = []
1✔
1880

1881
    def _create_delivery_stream(**kwargs):
1✔
1882
        if "DeliveryStreamName" not in kwargs:
1✔
1883
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1884
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1885
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1886
        delivery_stream_names.append(delivery_stream_name)
1✔
1887
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1888
        return response
1✔
1889

1890
    yield _create_delivery_stream
1✔
1891

1892
    for delivery_stream_name in delivery_stream_names:
1✔
1893
        try:
1✔
1894
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1895
        except Exception:
×
1896
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1897

1898

1899
@pytest.fixture
1✔
1900
def ses_configuration_set(aws_client):
1✔
1901
    configuration_set_names = []
1✔
1902

1903
    def factory(name: str) -> None:
1✔
1904
        aws_client.ses.create_configuration_set(
1✔
1905
            ConfigurationSet={
1906
                "Name": name,
1907
            },
1908
        )
1909
        configuration_set_names.append(name)
1✔
1910

1911
    yield factory
1✔
1912

1913
    for configuration_set_name in configuration_set_names:
1✔
1914
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1915

1916

1917
@pytest.fixture
1✔
1918
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1919
    event_destinations = []
1✔
1920

1921
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1922
        aws_client.ses.create_configuration_set_event_destination(
1✔
1923
            ConfigurationSetName=config_set_name,
1924
            EventDestination={
1925
                "Name": event_destination_name,
1926
                "Enabled": True,
1927
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1928
                "SNSDestination": {
1929
                    "TopicARN": topic_arn,
1930
                },
1931
            },
1932
        )
1933
        event_destinations.append((config_set_name, event_destination_name))
1✔
1934

1935
    yield factory
1✔
1936

1937
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1938
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1939
            ConfigurationSetName=created_config_set_name,
1940
            EventDestinationName=created_event_destination_name,
1941
        )
1942

1943

1944
@pytest.fixture
1✔
1945
def ses_email_template(aws_client):
1✔
1946
    template_names = []
1✔
1947

1948
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1949
        aws_client.ses.create_template(
1✔
1950
            Template={
1951
                "TemplateName": name,
1952
                "SubjectPart": subject,
1953
                "TextPart": contents,
1954
            }
1955
        )
1956
        template_names.append(name)
1✔
1957

1958
    yield factory
1✔
1959

1960
    for template_name in template_names:
1✔
1961
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1962

1963

1964
@pytest.fixture
1✔
1965
def ses_verify_identity(aws_client):
1✔
1966
    identities = []
1✔
1967

1968
    def factory(email_address: str) -> None:
1✔
1969
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1970

1971
    yield factory
1✔
1972

1973
    for identity in identities:
1✔
1974
        aws_client.ses.delete_identity(Identity=identity)
×
1975

1976

1977
@pytest.fixture
1✔
1978
def setup_sender_email_address(ses_verify_identity):
1✔
1979
    """
1980
    If the test is running against AWS then assume the email address passed is already
1981
    verified, and passes the given email address through. Otherwise, it generates one random
1982
    email address and verify them.
1983
    """
1984

1985
    def inner(sender_email_address: Optional[str] = None) -> str:
1✔
1986
        if is_aws_cloud():
1✔
1987
            if sender_email_address is None:
×
1988
                raise ValueError(
×
1989
                    "sender_email_address must be specified to run this test against AWS"
1990
                )
1991
        else:
1992
            # overwrite the given parameters with localstack specific ones
1993
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
1994
            ses_verify_identity(sender_email_address)
1✔
1995

1996
        return sender_email_address
1✔
1997

1998
    return inner
1✔
1999

2000

2001
@pytest.fixture
1✔
2002
def ec2_create_security_group(aws_client):
1✔
2003
    ec2_sgs = []
1✔
2004

2005
    def factory(ports=None, ip_protocol: str = "tcp", **kwargs):
1✔
2006
        """
2007
        Create the target group and authorize the security group ingress.
2008
        :param ports: list of ports to be authorized for the ingress rule.
2009
        :param ip_protocol: the ip protocol for the permissions (tcp by default)
2010
        """
2011
        if "GroupName" not in kwargs:
1✔
2012
            kwargs["GroupName"] = f"sg-{short_uid()}"
1✔
2013
        # Making sure the call to CreateSecurityGroup gets the right arguments
2014
        _args = select_from_typed_dict(CreateSecurityGroupRequest, kwargs)
1✔
2015
        security_group = aws_client.ec2.create_security_group(**_args)
1✔
2016
        security_group_id = security_group["GroupId"]
1✔
2017
        permissions = [
1✔
2018
            {
2019
                "FromPort": port,
2020
                "IpProtocol": ip_protocol,
2021
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2022
                "ToPort": port,
2023
            }
2024
            for port in ports or []
2025
        ]
2026
        aws_client.ec2.authorize_security_group_ingress(
1✔
2027
            GroupId=security_group_id,
2028
            IpPermissions=permissions,
2029
        )
2030

2031
        ec2_sgs.append(security_group_id)
1✔
2032
        return security_group
1✔
2033

2034
    yield factory
1✔
2035

2036
    for sg_group_id in ec2_sgs:
1✔
2037
        try:
1✔
2038
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2039
        except Exception as e:
×
2040
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2041

2042

2043
@pytest.fixture
1✔
2044
def cleanups():
1✔
2045
    cleanup_fns = []
1✔
2046

2047
    yield cleanup_fns
1✔
2048

2049
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2050
        try:
1✔
2051
            cleanup_callback()
1✔
2052
        except Exception as e:
1✔
2053
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2054

2055

2056
@pytest.fixture(scope="session")
1✔
2057
def account_id(aws_client):
1✔
2058
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2059
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2060
    else:
2061
        return TEST_AWS_ACCOUNT_ID
×
2062

2063

2064
@pytest.fixture(scope="session")
1✔
2065
def region_name(aws_client):
1✔
2066
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2067
        return aws_client.sts.meta.region_name
1✔
2068
    else:
2069
        return TEST_AWS_REGION_NAME
×
2070

2071

2072
@pytest.fixture(scope="session")
1✔
2073
def partition(region_name):
1✔
2074
    return get_partition(region_name)
1✔
2075

2076

2077
@pytest.fixture(scope="session")
1✔
2078
def secondary_account_id(secondary_aws_client):
1✔
2079
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2080
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2081
    else:
2082
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2083

2084

2085
@pytest.fixture(scope="session")
1✔
2086
def secondary_region_name():
1✔
2087
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2088

2089

2090
@pytest.hookimpl
1✔
2091
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2092
    only_localstack = pytest.mark.skipif(
1✔
2093
        is_aws_cloud(),
2094
        reason="test only applicable if run against localstack",
2095
    )
2096
    for item in items:
1✔
2097
        for mark in item.iter_markers():
1✔
2098
            if mark.name.endswith("only_localstack"):
1✔
2099
                item.add_marker(only_localstack)
1✔
2100
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2101
            # add a marker that indicates that this test is snapshot validated
2102
            # if it uses the snapshot fixture -> allows selecting only snapshot
2103
            # validated tests in order to capture new snapshots for a whole
2104
            # test file with "-m snapshot_validated"
2105
            item.add_marker("snapshot_validated")
1✔
2106

2107

2108
@pytest.fixture
1✔
2109
def sample_stores() -> AccountRegionBundle:
1✔
2110
    class SampleStore(BaseStore):
1✔
2111
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2112
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2113
        region_specific_attr = LocalAttribute(default=list)
1✔
2114

2115
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2116

2117

2118
@pytest.fixture
1✔
2119
def create_rest_apigw(aws_client_factory):
1✔
2120
    rest_apis = []
1✔
2121
    retry_boto_config = None
1✔
2122
    if is_aws_cloud():
1✔
2123
        retry_boto_config = botocore.config.Config(
×
2124
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2125
            retries={"max_attempts": 10, "mode": "adaptive"}
2126
        )
2127

2128
    def _create_apigateway_function(**kwargs):
1✔
2129
        client_region_name = kwargs.pop("region_name", None)
1✔
2130
        apigateway_client = aws_client_factory(
1✔
2131
            region_name=client_region_name, config=retry_boto_config
2132
        ).apigateway
2133
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2134

2135
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2136
        api_id = response.get("id")
1✔
2137
        rest_apis.append((api_id, client_region_name))
1✔
2138

2139
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2140

2141
    yield _create_apigateway_function
1✔
2142

2143
    for rest_api_id, _client_region_name in rest_apis:
1✔
2144
        apigateway_client = aws_client_factory(
1✔
2145
            region_name=_client_region_name,
2146
            config=retry_boto_config,
2147
        ).apigateway
2148
        # First, retrieve the usage plans associated with the REST API
2149
        usage_plan_ids = []
1✔
2150
        usage_plans = apigateway_client.get_usage_plans()
1✔
2151
        for item in usage_plans.get("items", []):
1✔
2152
            api_stages = item.get("apiStages", [])
1✔
2153
            usage_plan_ids.extend(
1✔
2154
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2155
            )
2156

2157
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2158
        with contextlib.suppress(Exception):
1✔
2159
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2160

2161
        # finally delete the usage plans and the API Keys linked to it
2162
        for usage_plan_id in usage_plan_ids:
1✔
2163
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2164
            for key in usage_plan_keys.get("items", []):
1✔
2165
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2166
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2167

2168

2169
@pytest.fixture
1✔
2170
def create_rest_apigw_openapi(aws_client_factory):
1✔
2171
    rest_apis = []
×
2172

2173
    def _create_apigateway_function(**kwargs):
×
2174
        region_name = kwargs.pop("region_name", None)
×
2175
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2176

2177
        response = apigateway_client.import_rest_api(**kwargs)
×
2178
        api_id = response.get("id")
×
2179
        rest_apis.append((api_id, region_name))
×
2180
        return api_id, response
×
2181

2182
    yield _create_apigateway_function
×
2183

2184
    for rest_api_id, region_name in rest_apis:
×
2185
        with contextlib.suppress(Exception):
×
2186
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2187
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2188

2189

2190
@pytest.fixture
1✔
2191
def appsync_create_api(aws_client):
1✔
2192
    graphql_apis = []
×
2193

2194
    def factory(**kwargs):
×
2195
        if "name" not in kwargs:
×
2196
            kwargs["name"] = f"graphql-api-testing-name-{short_uid()}"
×
2197
        if not kwargs.get("authenticationType"):
×
2198
            kwargs["authenticationType"] = "API_KEY"
×
2199

2200
        result = aws_client.appsync.create_graphql_api(**kwargs)["graphqlApi"]
×
2201
        graphql_apis.append(result["apiId"])
×
2202
        return result
×
2203

2204
    yield factory
×
2205

2206
    for api in graphql_apis:
×
2207
        try:
×
2208
            aws_client.appsync.delete_graphql_api(apiId=api)
×
2209
        except Exception as e:
×
2210
            LOG.debug("Error cleaning up AppSync API: %s, %s", api, e)
×
2211

2212

2213
@pytest.fixture
1✔
2214
def assert_host_customisation(monkeypatch):
1✔
2215
    localstack_host = "foo.bar"
1✔
2216
    monkeypatch.setattr(
1✔
2217
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2218
    )
2219

2220
    def asserter(
1✔
2221
        url: str,
2222
        *,
2223
        custom_host: Optional[str] = None,
2224
    ):
2225
        if custom_host is not None:
1✔
2226
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2227

2228
            assert localstack_host not in url
×
2229
        else:
2230
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2231

2232
    yield asserter
1✔
2233

2234

2235
@pytest.fixture
1✔
2236
def echo_http_server(httpserver: HTTPServer):
1✔
2237
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2238

2239
    def _echo(request: Request) -> Response:
1✔
2240
        request_json = None
1✔
2241
        if request.is_json:
1✔
2242
            with contextlib.suppress(ValueError):
1✔
2243
                request_json = json.loads(request.data)
1✔
2244
        result = {
1✔
2245
            "data": request.data or "{}",
2246
            "headers": dict(request.headers),
2247
            "url": request.url,
2248
            "method": request.method,
2249
            "json": request_json,
2250
        }
2251
        response_body = json.dumps(json_safe(result))
1✔
2252
        return Response(response_body, status=200)
1✔
2253

2254
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2255
    http_endpoint = httpserver.url_for("/")
1✔
2256

2257
    return http_endpoint
1✔
2258

2259

2260
@pytest.fixture
1✔
2261
def echo_http_server_post(echo_http_server):
1✔
2262
    """
2263
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2264
    """
2265
    if is_aws_cloud():
1✔
2266
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2267

2268
    return f"{echo_http_server}post"
1✔
2269

2270

2271
def create_policy_doc(effect: str, actions: List, resource=None) -> Dict:
1✔
2272
    actions = ensure_list(actions)
1✔
2273
    resource = resource or "*"
1✔
2274
    return {
1✔
2275
        "Version": "2012-10-17",
2276
        "Statement": [
2277
            {
2278
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2279
                "Sid": f"s{short_uid()}",
2280
                "Effect": effect,
2281
                "Action": actions,
2282
                "Resource": resource,
2283
            }
2284
        ],
2285
    }
2286

2287

2288
@pytest.fixture
1✔
2289
def create_policy_generated_document(create_policy):
1✔
2290
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2291
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2292
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2293
        response = create_policy(
1✔
2294
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2295
        )
2296
        policy_arn = response["Policy"]["Arn"]
1✔
2297
        return policy_arn
1✔
2298

2299
    return _create_policy_with_doc
1✔
2300

2301

2302
@pytest.fixture
1✔
2303
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2304
    def _create_role_with_policy(
1✔
2305
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2306
    ):
2307
        iam_client = iam_client or aws_client.iam
1✔
2308

2309
        role_name = f"role-{short_uid()}"
1✔
2310
        result = create_role(
1✔
2311
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2312
        )
2313
        role_arn = result["Role"]["Arn"]
1✔
2314
        policy_name = f"p-{short_uid()}"
1✔
2315

2316
        if attach:
1✔
2317
            # create role and attach role policy
2318
            policy_arn = create_policy_generated_document(
1✔
2319
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2320
            )
2321
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2322
        else:
2323
            # put role policy
2324
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2325
            policy_document = json.dumps(policy_document)
1✔
2326
            iam_client.put_role_policy(
1✔
2327
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2328
            )
2329

2330
        return role_name, role_arn
1✔
2331

2332
    return _create_role_with_policy
1✔
2333

2334

2335
@pytest.fixture
1✔
2336
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2337
    def _create_user_with_policy(effect, actions, resource=None):
×
2338
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2339
        username = f"user-{short_uid()}"
×
2340
        create_user(UserName=username)
×
2341
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2342
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2343
        return username, keys
×
2344

2345
    return _create_user_with_policy
×
2346

2347

2348
@pytest.fixture()
1✔
2349
def register_extension(s3_bucket, aws_client):
1✔
2350
    cfn_client = aws_client.cloudformation
×
2351
    extensions_arns = []
×
2352

2353
    def _register(extension_name, extension_type, artifact_path):
×
2354
        bucket = s3_bucket
×
2355
        key = f"artifact-{short_uid()}"
×
2356

2357
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2358

2359
        register_response = cfn_client.register_type(
×
2360
            Type=extension_type,
2361
            TypeName=extension_name,
2362
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2363
        )
2364

2365
        registration_token = register_response["RegistrationToken"]
×
2366
        cfn_client.get_waiter("type_registration_complete").wait(
×
2367
            RegistrationToken=registration_token
2368
        )
2369

2370
        describe_response = cfn_client.describe_type_registration(
×
2371
            RegistrationToken=registration_token
2372
        )
2373

2374
        extensions_arns.append(describe_response["TypeArn"])
×
2375
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2376

2377
        return describe_response
×
2378

2379
    yield _register
×
2380

2381
    for arn in extensions_arns:
×
2382
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2383
        for v in versions:
×
2384
            try:
×
2385
                cfn_client.deregister_type(Arn=v["Arn"])
×
2386
            except Exception:
×
2387
                continue
×
2388
        cfn_client.deregister_type(Arn=arn)
×
2389

2390

2391
@pytest.fixture
1✔
2392
def hosted_zone(aws_client):
1✔
2393
    zone_ids = []
1✔
2394

2395
    def factory(**kwargs):
1✔
2396
        if "CallerReference" not in kwargs:
1✔
2397
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2398
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2399
        zone_id = response["HostedZone"]["Id"]
1✔
2400
        zone_ids.append(zone_id)
1✔
2401
        return response
1✔
2402

2403
    yield factory
1✔
2404

2405
    for zone_id in zone_ids[::-1]:
1✔
2406
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2407

2408

2409
@pytest.fixture
1✔
2410
def openapi_validate(monkeypatch):
1✔
2411
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2412
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2413

2414

2415
@pytest.fixture
1✔
2416
def set_resource_custom_id():
1✔
2417
    set_ids = []
1✔
2418

2419
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2420
        localstack_id_manager.set_custom_id(
1✔
2421
            resource_identifier=resource_identifier, custom_id=custom_id
2422
        )
2423
        set_ids.append(resource_identifier)
1✔
2424

2425
    yield _set_custom_id
1✔
2426

2427
    for resource_identifier in set_ids:
1✔
2428
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2429

2430

2431
###############################
2432
# Events (EventBridge) fixtures
2433
###############################
2434

2435

2436
@pytest.fixture
1✔
2437
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2438
    event_bus_names = []
1✔
2439

2440
    def _create_event_bus(**kwargs):
1✔
2441
        if "Name" not in kwargs:
1✔
2442
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2443

2444
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2445
        event_bus_names.append(kwargs["Name"])
1✔
2446
        return response
1✔
2447

2448
    yield _create_event_bus
1✔
2449

2450
    for event_bus_name in event_bus_names:
1✔
2451
        try:
1✔
2452
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2453
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2454

2455
            # Delete all rules for the current event bus
2456
            for rule in rules:
1✔
2457
                try:
1✔
2458
                    response = aws_client.events.list_targets_by_rule(
1✔
2459
                        Rule=rule, EventBusName=event_bus_name
2460
                    )
2461
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2462

2463
                    # Remove all targets for the current rule
2464
                    if targets:
1✔
2465
                        for target in targets:
1✔
2466
                            aws_client.events.remove_targets(
1✔
2467
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2468
                            )
2469

2470
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2471
                except Exception as e:
×
2472
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2473

2474
            # Delete archives for event bus
2475
            event_source_arn = (
1✔
2476
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2477
            )
2478
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2479
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2480
            for archive in archives:
1✔
2481
                try:
1✔
2482
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2483
                except Exception as e:
×
2484
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2485

2486
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2487
        except Exception as e:
1✔
2488
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2489

2490

2491
@pytest.fixture
1✔
2492
def events_put_rule(aws_client):
1✔
2493
    rules = []
1✔
2494

2495
    def _put_rule(**kwargs):
1✔
2496
        if "Name" not in kwargs:
1✔
2497
            kwargs["Name"] = f"rule-{short_uid()}"
×
2498

2499
        response = aws_client.events.put_rule(**kwargs)
1✔
2500
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2501
        return response
1✔
2502

2503
    yield _put_rule
1✔
2504

2505
    for rule, event_bus_name in rules:
1✔
2506
        try:
1✔
2507
            response = aws_client.events.list_targets_by_rule(
1✔
2508
                Rule=rule, EventBusName=event_bus_name
2509
            )
2510
            targets = [target["Id"] for target in response["Targets"]]
1✔
2511

2512
            # Remove all targets for the current rule
2513
            if targets:
1✔
2514
                for target in targets:
1✔
2515
                    aws_client.events.remove_targets(
1✔
2516
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2517
                    )
2518

2519
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2520
        except Exception as e:
1✔
2521
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2522

2523

2524
@pytest.fixture
1✔
2525
def events_create_rule(aws_client):
1✔
2526
    rules = []
1✔
2527

2528
    def _create_rule(**kwargs):
1✔
2529
        rule_name = kwargs["Name"]
1✔
2530
        bus_name = kwargs.get("EventBusName", "")
1✔
2531
        pattern = kwargs.get("EventPattern", {})
1✔
2532
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2533
        rule_arn = aws_client.events.put_rule(
1✔
2534
            Name=rule_name,
2535
            EventBusName=bus_name,
2536
            EventPattern=json.dumps(pattern),
2537
            ScheduleExpression=schedule,
2538
        )["RuleArn"]
2539
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2540
        return rule_arn
1✔
2541

2542
    yield _create_rule
1✔
2543

2544
    for rule in rules:
1✔
2545
        targets = aws_client.events.list_targets_by_rule(
1✔
2546
            Rule=rule["name"], EventBusName=rule["bus"]
2547
        )["Targets"]
2548

2549
        targetIds = [target["Id"] for target in targets]
1✔
2550
        if len(targetIds) > 0:
1✔
2551
            aws_client.events.remove_targets(
1✔
2552
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2553
            )
2554

2555
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2556

2557

2558
@pytest.fixture
1✔
2559
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2560
    queue_urls = []
1✔
2561

2562
    def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]:
1✔
2563
        if not queue_name:
1✔
2564
            queue_name = f"tests-queue-{short_uid()}"
1✔
2565
        sqs_client = aws_client.sqs
1✔
2566
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2567
        queue_urls.append(queue_url)
1✔
2568
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2569
        policy = {
1✔
2570
            "Version": "2012-10-17",
2571
            "Id": f"sqs-eventbridge-{short_uid()}",
2572
            "Statement": [
2573
                {
2574
                    "Sid": f"SendMessage-{short_uid()}",
2575
                    "Effect": "Allow",
2576
                    "Principal": {"Service": "events.amazonaws.com"},
2577
                    "Action": "sqs:SendMessage",
2578
                    "Resource": queue_arn,
2579
                }
2580
            ],
2581
        }
2582
        sqs_client.set_queue_attributes(
1✔
2583
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2584
        )
2585
        return queue_url, queue_arn
1✔
2586

2587
    yield _sqs_as_events_target
1✔
2588

2589
    for queue_url in queue_urls:
1✔
2590
        try:
1✔
2591
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
2592
        except Exception as e:
×
2593
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2594

2595

2596
@pytest.fixture
1✔
2597
def clean_up(
1✔
2598
    aws_client,
2599
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2600
    def _clean_up(
1✔
2601
        bus_name=None,
2602
        rule_name=None,
2603
        target_ids=None,
2604
        queue_url=None,
2605
        log_group_name=None,
2606
    ):
2607
        events_client = aws_client.events
1✔
2608
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2609
        if target_ids:
1✔
2610
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2611
            call_safe(
1✔
2612
                events_client.remove_targets,
2613
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2614
            )
2615
        if rule_name:
1✔
2616
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2617
        if bus_name:
1✔
2618
            call_safe(events_client.delete_event_bus, kwargs=dict(Name=bus_name))
×
2619
        if queue_url:
1✔
2620
            sqs_client = aws_client.sqs
×
2621
            call_safe(sqs_client.delete_queue, kwargs=dict(QueueUrl=queue_url))
×
2622
        if log_group_name:
1✔
2623
            logs_client = aws_client.logs
×
2624

2625
            def _delete_log_group():
×
2626
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2627
                for log_stream in log_streams["logStreams"]:
×
2628
                    logs_client.delete_log_stream(
×
2629
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2630
                    )
2631
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2632

2633
            call_safe(_delete_log_group)
×
2634

2635
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc