• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20610689355

29 Dec 2025 01:37PM UTC coverage: 86.918% (-0.003%) from 86.921%
20610689355

push

github

web-flow
IAM: Update `create_user_with_policy` fixture (#13568)

2 of 5 new or added lines in 1 file covered. (40.0%)

30 existing lines in 4 files now uncovered.

70053 of 80597 relevant lines covered (86.92%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.19
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from collections.abc import Callable
1✔
10
from typing import TYPE_CHECKING, Any, Unpack
1✔
11

12
import botocore.auth
1✔
13
import botocore.config
1✔
14
import botocore.credentials
1✔
15
import botocore.session
1✔
16
import pytest
1✔
17
from _pytest.config import Config
1✔
18
from _pytest.nodes import Item
1✔
19
from botocore.exceptions import ClientError
1✔
20
from botocore.regions import EndpointResolver
1✔
21
from pytest_httpserver import HTTPServer
1✔
22
from werkzeug import Request, Response
1✔
23

24
from localstack import config
1✔
25
from localstack.aws.api.cloudformation import CreateChangeSetInput, Parameter
1✔
26
from localstack.aws.api.ec2 import CreateSecurityGroupRequest, CreateVpcEndpointRequest, VpcEndpoint
1✔
27
from localstack.aws.connect import ServiceLevelClientFactory
1✔
28
from localstack.services.stores import (
1✔
29
    AccountRegionBundle,
30
    BaseStore,
31
    CrossAccountAttribute,
32
    CrossRegionAttribute,
33
    LocalAttribute,
34
)
35
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
36
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud, wait_for_user
1✔
37
from localstack.testing.config import (
1✔
38
    SECONDARY_TEST_AWS_ACCOUNT_ID,
39
    SECONDARY_TEST_AWS_REGION_NAME,
40
    TEST_AWS_ACCOUNT_ID,
41
    TEST_AWS_REGION_NAME,
42
)
43
from localstack.utils import testutil
1✔
44
from localstack.utils.aws.arns import get_partition
1✔
45
from localstack.utils.aws.client import SigningHttpClient
1✔
46
from localstack.utils.aws.resources import create_dynamodb_table
1✔
47
from localstack.utils.bootstrap import is_api_enabled
1✔
48
from localstack.utils.collections import ensure_list, select_from_typed_dict
1✔
49
from localstack.utils.functions import call_safe, run_safe
1✔
50
from localstack.utils.http import safe_requests as requests
1✔
51
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
52
from localstack.utils.json import CustomEncoder, json_safe
1✔
53
from localstack.utils.net import wait_for_port_open
1✔
54
from localstack.utils.strings import short_uid, to_str
1✔
55
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
56

57
LOG = logging.getLogger(__name__)
1✔
58

59
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
60
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
61

62
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
63
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
64
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
65
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
66

67

68
if TYPE_CHECKING:
1✔
69
    from mypy_boto3_sqs import SQSClient
×
70
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
71

72

73
@pytest.fixture(scope="session")
1✔
74
def aws_client_no_retry(aws_client_factory):
1✔
75
    """
76
    This fixture can be used to obtain Boto clients with disabled retries for testing.
77
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
78

79
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
80
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
81

82
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
83
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
84
    General socket/connection errors:
85
    * ConnectionError
86
    * ConnectionClosedError
87
    * ReadTimeoutError
88
    * EndpointConnectionError
89

90
    Service-side throttling/limit errors and exceptions:
91
    * Throttling
92
    * ThrottlingException
93
    * ThrottledException
94
    * RequestThrottledException
95
    * ProvisionedThroughputExceededException
96

97
    HTTP status codes: 429, 500, 502, 503, 504, and 509
98

99
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
100
    """
101
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
102
    return aws_client_factory(config=no_retry_config)
1✔
103

104

105
@pytest.fixture(scope="class")
1✔
106
def aws_http_client_factory(aws_session):
1✔
107
    """
108
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
109
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
110
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
111
    LocalStack.
112

113
    Example invocations
114

115
        client = aws_signing_http_client_factory("sqs")
116
        client.get("http://localhost:4566/000000000000/my-queue")
117

118
    or
119
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
120
        client.post("...")
121
    """
122

123
    def factory(
1✔
124
        service: str,
125
        region: str = None,
126
        signer_factory: Callable[
127
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
128
        ] = botocore.auth.SigV4QueryAuth,
129
        endpoint_url: str = None,
130
        aws_access_key_id: str = None,
131
        aws_secret_access_key: str = None,
132
    ):
133
        region = region or TEST_AWS_REGION_NAME
1✔
134

135
        if aws_access_key_id or aws_secret_access_key:
1✔
136
            credentials = botocore.credentials.Credentials(
1✔
137
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
138
            )
139
        else:
140
            credentials = aws_session.get_credentials()
1✔
141

142
        creds = credentials.get_frozen_credentials()
1✔
143

144
        if not endpoint_url:
1✔
145
            if is_aws_cloud():
1✔
146
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
147
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
148
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
149
            else:
150
                endpoint_url = config.internal_service_url()
1✔
151

152
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
153

154
    return factory
1✔
155

156

157
@pytest.fixture(scope="class")
1✔
158
def s3_vhost_client(aws_client_factory, region_name):
1✔
159
    return aws_client_factory(
1✔
160
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
161
    ).s3
162

163

164
@pytest.fixture
1✔
165
def dynamodb_wait_for_table_active(aws_client):
1✔
166
    def wait_for_table_active(table_name: str, client=None):
1✔
167
        def wait():
1✔
168
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
169
                "TableStatus"
170
            ] == "ACTIVE"
171

172
        poll_condition(wait, timeout=30)
1✔
173

174
    return wait_for_table_active
1✔
175

176

177
@pytest.fixture
1✔
178
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
179
    tables = []
1✔
180

181
    def factory(**kwargs):
1✔
182
        if "TableName" not in kwargs:
1✔
183
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
184

185
        tables.append(kwargs["TableName"])
1✔
186
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
187
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
188
        return response
1✔
189

190
    yield factory
1✔
191

192
    # cleanup
193
    for table in tables:
1✔
194
        try:
1✔
195
            # table has to be in ACTIVE state before deletion
196
            dynamodb_wait_for_table_active(table)
1✔
197
            aws_client.dynamodb.delete_table(TableName=table)
1✔
198
        except Exception as e:
1✔
199
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
200

201

202
@pytest.fixture
1✔
203
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
204
    # beware, this swallows exception in create_dynamodb_table utility function
205
    tables = []
1✔
206

207
    def factory(**kwargs):
1✔
208
        kwargs["client"] = aws_client.dynamodb
1✔
209
        if "table_name" not in kwargs:
1✔
210
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
211
        if "partition_key" not in kwargs:
1✔
212
            kwargs["partition_key"] = "id"
1✔
213

214
        tables.append(kwargs["table_name"])
1✔
215

216
        return create_dynamodb_table(**kwargs)
1✔
217

218
    yield factory
1✔
219

220
    # cleanup
221
    for table in tables:
1✔
222
        try:
1✔
223
            # table has to be in ACTIVE state before deletion
224
            dynamodb_wait_for_table_active(table)
1✔
225
            aws_client.dynamodb.delete_table(TableName=table)
1✔
226
        except Exception as e:
1✔
227
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
228

229

230
@pytest.fixture
1✔
231
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
232
    buckets = []
1✔
233

234
    def factory(**kwargs) -> str:
1✔
235
        if "Bucket" not in kwargs:
1✔
236
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
1✔
237

238
        if (
1✔
239
            "CreateBucketConfiguration" not in kwargs
240
            and aws_client.s3.meta.region_name != "us-east-1"
241
        ):
242
            kwargs["CreateBucketConfiguration"] = {
×
243
                "LocationConstraint": aws_client.s3.meta.region_name
244
            }
245

246
        aws_client.s3.create_bucket(**kwargs)
1✔
247
        buckets.append(kwargs["Bucket"])
1✔
248
        return kwargs["Bucket"]
1✔
249

250
    yield factory
1✔
251

252
    # cleanup
253
    for bucket in buckets:
1✔
254
        try:
1✔
255
            s3_empty_bucket(bucket)
1✔
256
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
257
        except Exception as e:
1✔
258
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
259

260

261
@pytest.fixture
1✔
262
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
263
    buckets = []
1✔
264

265
    def factory(s3_client, **kwargs) -> str:
1✔
266
        if "Bucket" not in kwargs:
1✔
267
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
1✔
268

269
        response = s3_client.create_bucket(**kwargs)
1✔
270
        buckets.append(kwargs["Bucket"])
1✔
271
        return response
1✔
272

273
    yield factory
1✔
274

275
    # cleanup
276
    for bucket in buckets:
1✔
277
        try:
1✔
278
            s3_empty_bucket(bucket)
1✔
279
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
280
        except Exception as e:
1✔
281
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
282

283

284
@pytest.fixture
1✔
285
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
286
    region = aws_client.s3.meta.region_name
1✔
287
    kwargs = {}
1✔
288
    if region != "us-east-1":
1✔
289
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
290
    return s3_create_bucket(**kwargs)
1✔
291

292

293
@pytest.fixture
1✔
294
def s3_empty_bucket(aws_client):
1✔
295
    """
296
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
297
    """
298

299
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
300
    # FIXME: this won't work when bucket has more than 1000 objects
301
    def factory(bucket_name: str):
1✔
302
        kwargs = {}
1✔
303
        try:
1✔
304
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
305
            kwargs["BypassGovernanceRetention"] = True
1✔
306
        except ClientError:
1✔
307
            pass
1✔
308

309
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
310
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
311
        if objects:
1✔
312
            aws_client.s3.delete_objects(
1✔
313
                Bucket=bucket_name,
314
                Delete={"Objects": objects},
315
                **kwargs,
316
            )
317

318
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
319
        versions = response.get("Versions", [])
1✔
320
        versions.extend(response.get("DeleteMarkers", []))
1✔
321

322
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
323
        if object_versions:
1✔
324
            aws_client.s3.delete_objects(
1✔
325
                Bucket=bucket_name,
326
                Delete={"Objects": object_versions},
327
                **kwargs,
328
            )
329

330
    yield factory
1✔
331

332

333
@pytest.fixture
1✔
334
def sqs_create_queue(aws_client):
1✔
335
    queue_urls = []
1✔
336

337
    def factory(**kwargs):
1✔
338
        if "QueueName" not in kwargs:
1✔
339
            kwargs["QueueName"] = f"test-queue-{short_uid()}"
1✔
340

341
        response = aws_client.sqs.create_queue(**kwargs)
1✔
342
        url = response["QueueUrl"]
1✔
343
        queue_urls.append(url)
1✔
344

345
        return url
1✔
346

347
    yield factory
1✔
348

349
    # cleanup
350
    for queue_url in queue_urls:
1✔
351
        try:
1✔
352
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
353
        except Exception as e:
1✔
354
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
355

356

357
@pytest.fixture
1✔
358
def sqs_receive_messages_delete(aws_client):
1✔
359
    def factory(
1✔
360
        queue_url: str,
361
        expected_messages: int | None = None,
362
        wait_time: int | None = 5,
363
    ):
364
        response = aws_client.sqs.receive_message(
1✔
365
            QueueUrl=queue_url,
366
            MessageAttributeNames=["All"],
367
            VisibilityTimeout=0,
368
            WaitTimeSeconds=wait_time,
369
        )
370
        messages = []
1✔
371
        for m in response["Messages"]:
1✔
372
            message = json.loads(to_str(m["Body"]))
1✔
373
            messages.append(message)
1✔
374

375
        if expected_messages is not None:
1✔
376
            assert len(messages) == expected_messages
×
377

378
        for message in response["Messages"]:
1✔
379
            aws_client.sqs.delete_message(
1✔
380
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
381
            )
382

383
        return messages
1✔
384

385
    return factory
1✔
386

387

388
@pytest.fixture
1✔
389
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
390
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
391
        all_messages = []
1✔
392
        for _ in range(max_iterations):
1✔
393
            try:
1✔
394
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
395
            except KeyError:
×
396
                # there were no messages
397
                continue
×
398
            all_messages.extend(messages)
1✔
399

400
            if len(all_messages) >= expected_messages:
1✔
401
                return all_messages[:expected_messages]
1✔
402

403
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
404

405
    return factory
1✔
406

407

408
@pytest.fixture
1✔
409
def sqs_collect_messages(aws_client):
1✔
410
    """Collects SQS messages from a given queue_url and deletes them by default.
411
    Example usage:
412
    messages = sqs_collect_messages(
413
         my_queue_url,
414
         expected=2,
415
         timeout=10,
416
         attribute_names=["All"],
417
         message_attribute_names=["All"],
418
    )
419
    """
420

421
    def factory(
1✔
422
        queue_url: str,
423
        expected: int,
424
        timeout: int,
425
        delete: bool = True,
426
        attribute_names: list[str] = None,
427
        message_attribute_names: list[str] = None,
428
        max_number_of_messages: int = 1,
429
        wait_time_seconds: int = 5,
430
        sqs_client: "SQSClient | None" = None,
431
    ) -> list["MessageTypeDef"]:
432
        sqs_client = sqs_client or aws_client.sqs
1✔
433
        collected = []
1✔
434

435
        def _receive():
1✔
436
            response = sqs_client.receive_message(
1✔
437
                QueueUrl=queue_url,
438
                # Maximum is 20 seconds. Performs long polling.
439
                WaitTimeSeconds=wait_time_seconds,
440
                # Maximum 10 messages
441
                MaxNumberOfMessages=max_number_of_messages,
442
                AttributeNames=attribute_names or [],
443
                MessageAttributeNames=message_attribute_names or [],
444
            )
445

446
            if messages := response.get("Messages"):
1✔
447
                collected.extend(messages)
1✔
448

449
                if delete:
1✔
450
                    for m in messages:
1✔
451
                        sqs_client.delete_message(
1✔
452
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
453
                        )
454

455
            return len(collected) >= expected
1✔
456

457
        if not poll_condition(_receive, timeout=timeout):
1✔
458
            raise TimeoutError(
×
459
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
460
            )
461

462
        return collected
1✔
463

464
    yield factory
1✔
465

466

467
@pytest.fixture
1✔
468
def sqs_queue(sqs_create_queue):
1✔
469
    return sqs_create_queue()
1✔
470

471

472
@pytest.fixture
1✔
473
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
474
    def _get_queue_arn(queue_url: str) -> str:
1✔
475
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
476
            "Attributes"
477
        ]["QueueArn"]
478

479
    return _get_queue_arn
1✔
480

481

482
@pytest.fixture
1✔
483
def sqs_queue_exists(aws_client):
1✔
484
    def _queue_exists(queue_url: str) -> bool:
1✔
485
        """
486
        Checks whether a queue with the given queue URL exists.
487
        :param queue_url: the queue URL
488
        :return: true if the queue exists, false otherwise
489
        """
490
        try:
1✔
491
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
492
            return result.get("QueueUrl") == queue_url
×
493
        except ClientError as e:
1✔
494
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
495
                return False
1✔
496
            raise
×
497

498
    yield _queue_exists
1✔
499

500

501
@pytest.fixture
1✔
502
def sns_create_topic(aws_client):
1✔
503
    topic_arns = []
1✔
504

505
    def _create_topic(**kwargs):
1✔
506
        if "Name" not in kwargs:
1✔
507
            kwargs["Name"] = f"test-topic-{short_uid()}"
1✔
508
        response = aws_client.sns.create_topic(**kwargs)
1✔
509
        topic_arns.append(response["TopicArn"])
1✔
510
        return response
1✔
511

512
    yield _create_topic
1✔
513

514
    for topic_arn in topic_arns:
1✔
515
        try:
1✔
516
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
517
        except Exception as e:
×
518
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
519

520

521
@pytest.fixture
1✔
522
def sns_wait_for_topic_delete(aws_client):
1✔
523
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
524
        def wait():
1✔
525
            try:
1✔
526
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
527
                return False
×
528
            except Exception as e:
1✔
529
                if "NotFound" in e.response["Error"]["Code"]:
1✔
530
                    return True
1✔
531

532
                raise
×
533

534
        poll_condition(wait, timeout=30)
1✔
535

536
    return wait_for_topic_delete
1✔
537

538

539
@pytest.fixture
1✔
540
def sns_subscription(aws_client):
1✔
541
    sub_arns = []
1✔
542

543
    def _create_sub(**kwargs):
1✔
544
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
545
            kwargs["ReturnSubscriptionArn"] = True
1✔
546

547
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
548
        response = aws_client.sns.subscribe(**kwargs)
1✔
549
        sub_arn = response["SubscriptionArn"]
1✔
550
        sub_arns.append(sub_arn)
1✔
551
        return response
1✔
552

553
    yield _create_sub
1✔
554

555
    for sub_arn in sub_arns:
1✔
556
        try:
1✔
557
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
558
        except Exception as e:
1✔
559
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
560

561

562
@pytest.fixture
1✔
563
def sns_topic(sns_create_topic, aws_client):
1✔
564
    topic_arn = sns_create_topic()["TopicArn"]
1✔
565
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
566

567

568
@pytest.fixture
1✔
569
def sns_allow_topic_sqs_queue(aws_client):
1✔
570
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
571
        # allow topic to write to sqs queue
572
        aws_client.sqs.set_queue_attributes(
1✔
573
            QueueUrl=sqs_queue_url,
574
            Attributes={
575
                "Policy": json.dumps(
576
                    {
577
                        "Statement": [
578
                            {
579
                                "Effect": "Allow",
580
                                "Principal": {"Service": "sns.amazonaws.com"},
581
                                "Action": "sqs:SendMessage",
582
                                "Resource": sqs_queue_arn,
583
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
584
                            }
585
                        ]
586
                    }
587
                )
588
            },
589
        )
590

591
    return _allow_sns_topic
1✔
592

593

594
@pytest.fixture
1✔
595
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
596
    subscriptions = []
1✔
597

598
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> dict[str, str]:
1✔
599
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
600

601
        # connect sns topic to sqs
602
        subscription = aws_client.sns.subscribe(
1✔
603
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
604
        )
605
        subscription_arn = subscription["SubscriptionArn"]
1✔
606

607
        # allow topic to write to sqs queue
608
        sns_allow_topic_sqs_queue(
1✔
609
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
610
        )
611

612
        subscriptions.append(subscription_arn)
1✔
613
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
614
            "Attributes"
615
        ]
616

617
    yield _factory
1✔
618

619
    for arn in subscriptions:
1✔
620
        try:
1✔
621
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
622
        except Exception as e:
×
623
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
624

625

626
@pytest.fixture
1✔
627
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
628
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
629
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
630
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
631
    http_servers = []
1✔
632

633
    def _create_http_endpoint(
1✔
634
        raw_message_delivery: bool = False,
635
    ) -> tuple[str, str, str, HTTPServer]:
636
        server = HTTPServer()
1✔
637
        server.start()
1✔
638
        http_servers.append(server)
1✔
639
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
640
        endpoint_url = server.url_for("/sns-endpoint")
1✔
641
        wait_for_port_open(endpoint_url)
1✔
642

643
        topic_arn = sns_create_topic()["TopicArn"]
1✔
644
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
645
        subscription_arn = subscription["SubscriptionArn"]
1✔
646
        delivery_policy = {
1✔
647
            "healthyRetryPolicy": {
648
                "minDelayTarget": 1,
649
                "maxDelayTarget": 1,
650
                "numRetries": 0,
651
                "numNoDelayRetries": 0,
652
                "numMinDelayRetries": 0,
653
                "numMaxDelayRetries": 0,
654
                "backoffFunction": "linear",
655
            },
656
            "sicklyRetryPolicy": None,
657
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
658
            "guaranteed": False,
659
        }
660
        aws_client.sns.set_subscription_attributes(
1✔
661
            SubscriptionArn=subscription_arn,
662
            AttributeName="DeliveryPolicy",
663
            AttributeValue=json.dumps(delivery_policy),
664
        )
665

666
        if raw_message_delivery:
1✔
667
            aws_client.sns.set_subscription_attributes(
1✔
668
                SubscriptionArn=subscription_arn,
669
                AttributeName="RawMessageDelivery",
670
                AttributeValue="true",
671
            )
672

673
        return topic_arn, subscription_arn, endpoint_url, server
1✔
674

675
    yield _create_http_endpoint
1✔
676

677
    for http_server in http_servers:
1✔
678
        if http_server.is_running():
1✔
679
            http_server.stop()
1✔
680

681

682
@pytest.fixture
1✔
683
def route53_hosted_zone(aws_client):
1✔
684
    hosted_zones = []
1✔
685

686
    def factory(**kwargs):
1✔
687
        if "Name" not in kwargs:
1✔
688
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
689
        if "CallerReference" not in kwargs:
1✔
690
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
691
        response = aws_client.route53.create_hosted_zone(
1✔
692
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
693
        )
694
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
695
        return response
1✔
696

697
    yield factory
1✔
698

699
    for zone in hosted_zones:
1✔
700
        try:
1✔
701
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
702
        except Exception as e:
1✔
703
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
704

705

706
@pytest.fixture
1✔
707
def transcribe_create_job(s3_bucket, aws_client):
1✔
708
    job_names = []
1✔
709

710
    def _create_job(audio_file: str, params: dict[str, Any] | None = None) -> str:
1✔
711
        s3_key = "test-clip.wav"
1✔
712

713
        if not params:
1✔
714
            params = {}
1✔
715

716
        if "TranscriptionJobName" not in params:
1✔
717
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
718

719
        if "LanguageCode" not in params:
1✔
720
            params["LanguageCode"] = "en-GB"
1✔
721

722
        if "Media" not in params:
1✔
723
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
724

725
        # upload test wav to a s3 bucket
726
        with open(audio_file, "rb") as f:
1✔
727
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
728

729
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
730

731
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
732
        job_names.append(job_name)
1✔
733

734
        return job_name
1✔
735

736
    yield _create_job
1✔
737

738
    for job_name in job_names:
1✔
739
        with contextlib.suppress(ClientError):
1✔
740
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
741

742

743
@pytest.fixture
1✔
744
def kinesis_create_stream(aws_client):
1✔
745
    stream_names = []
1✔
746

747
    def _create_stream(**kwargs):
1✔
748
        if "StreamName" not in kwargs:
1✔
749
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
750
        aws_client.kinesis.create_stream(**kwargs)
1✔
751
        stream_names.append(kwargs["StreamName"])
1✔
752
        return kwargs["StreamName"]
1✔
753

754
    yield _create_stream
1✔
755

756
    for stream_name in stream_names:
1✔
757
        try:
1✔
758
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
759
        except Exception as e:
1✔
760
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
1✔
761

762

763
@pytest.fixture
1✔
764
def wait_for_stream_ready(aws_client):
1✔
765
    def _wait_for_stream_ready(stream_name: str):
1✔
766
        def is_stream_ready():
1✔
767
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
768
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
769
                "ACTIVE",
770
                "UPDATING",
771
            ]
772

773
        return poll_condition(is_stream_ready)
1✔
774

775
    return _wait_for_stream_ready
1✔
776

777

778
@pytest.fixture
1✔
779
def wait_for_delivery_stream_ready(aws_client):
1✔
780
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
781
        def is_stream_ready():
1✔
782
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
783
                DeliveryStreamName=delivery_stream_name
784
            )
785
            return (
1✔
786
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
787
                == "ACTIVE"
788
            )
789

790
        poll_condition(is_stream_ready)
1✔
791

792
    return _wait_for_stream_ready
1✔
793

794

795
@pytest.fixture
1✔
796
def wait_for_dynamodb_stream_ready(aws_client):
1✔
797
    def _wait_for_stream_ready(stream_arn: str, client=None):
1✔
798
        def is_stream_ready():
1✔
799
            ddb_client = client or aws_client.dynamodbstreams
1✔
800
            describe_stream_response = ddb_client.describe_stream(StreamArn=stream_arn)
1✔
801
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
802

803
        return poll_condition(is_stream_ready)
1✔
804

805
    return _wait_for_stream_ready
1✔
806

807

808
@pytest.fixture()
1✔
809
def kms_create_key(aws_client_factory):
1✔
810
    key_ids = []
1✔
811

812
    def _create_key(region_name: str = None, **kwargs):
1✔
813
        if "Description" not in kwargs:
1✔
814
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
815
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
816
            "KeyMetadata"
817
        ]
818
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
819
        return key_metadata
1✔
820

821
    yield _create_key
1✔
822

823
    for region_name, key_id in key_ids:
1✔
824
        try:
1✔
825
            # shortest amount of time you can schedule the deletion
826
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
827
                KeyId=key_id, PendingWindowInDays=7
828
            )
829
        except Exception as e:
1✔
830
            exception_message = str(e)
1✔
831
            # Some tests schedule their keys for deletion themselves.
832
            if (
1✔
833
                "KMSInvalidStateException" not in exception_message
834
                or "is pending deletion" not in exception_message
835
            ):
836
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
837

838

839
@pytest.fixture()
1✔
840
def kms_replicate_key(aws_client_factory):
1✔
841
    key_ids = []
1✔
842

843
    def _replicate_key(region_from=None, **kwargs):
1✔
844
        region_to = kwargs.get("ReplicaRegion")
1✔
845
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
846
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
847

848
    yield _replicate_key
1✔
849

850
    for region_to, key_id in key_ids:
1✔
851
        try:
1✔
852
            # shortest amount of time you can schedule the deletion
853
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
854
                KeyId=key_id, PendingWindowInDays=7
855
            )
856
        except Exception as e:
×
857
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
858

859

860
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
861
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
862
# to make sure that we clean up aliases before keys get cleaned up.
863
@pytest.fixture()
1✔
864
def kms_create_alias(kms_create_key, aws_client):
1✔
865
    aliases = []
1✔
866

867
    def _create_alias(**kwargs):
1✔
868
        if "AliasName" not in kwargs:
1✔
869
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
870
        if "TargetKeyId" not in kwargs:
1✔
871
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
872

873
        aws_client.kms.create_alias(**kwargs)
1✔
874
        aliases.append(kwargs["AliasName"])
1✔
875
        return kwargs["AliasName"]
1✔
876

877
    yield _create_alias
1✔
878

879
    for alias in aliases:
1✔
880
        try:
1✔
881
            aws_client.kms.delete_alias(AliasName=alias)
1✔
882
        except Exception as e:
1✔
883
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
884

885

886
@pytest.fixture()
1✔
887
def kms_create_grant(kms_create_key, aws_client):
1✔
888
    grants = []
1✔
889

890
    def _create_grant(**kwargs):
1✔
891
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
892
        # but some GranteePrincipal is required to create a grant.
893
        GRANTEE_PRINCIPAL_ARN = (
1✔
894
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
895
        )
896

897
        if "Operations" not in kwargs:
1✔
898
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
899
        if "GranteePrincipal" not in kwargs:
1✔
900
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
901
        if "KeyId" not in kwargs:
1✔
902
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
903

904
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
905
        grants.append((grant_id, kwargs["KeyId"]))
1✔
906
        return grant_id, kwargs["KeyId"]
1✔
907

908
    yield _create_grant
1✔
909

910
    for grant_id, key_id in grants:
1✔
911
        try:
1✔
912
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
913
        except Exception as e:
×
914
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
915

916

917
@pytest.fixture
1✔
918
def kms_key(kms_create_key):
1✔
919
    return kms_create_key()
1✔
920

921

922
@pytest.fixture
1✔
923
def kms_grant_and_key(kms_key, aws_client):
1✔
924
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
925

926
    return [
1✔
927
        aws_client.kms.create_grant(
928
            KeyId=kms_key["KeyId"],
929
            GranteePrincipal=user_arn,
930
            Operations=["Decrypt", "Encrypt"],
931
        ),
932
        kms_key,
933
    ]
934

935

936
@pytest.fixture
1✔
937
def opensearch_wait_for_cluster(aws_client):
1✔
938
    def _wait_for_cluster(domain_name: str):
1✔
939
        def finished_processing():
1✔
940
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
941
            return status["DomainProcessingStatus"] == "Active" and "Endpoint" in status
1✔
942

943
        assert poll_condition(
1✔
944
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
945
        ), f"could not start domain: {domain_name}"
946

947
    return _wait_for_cluster
1✔
948

949

950
@pytest.fixture
1✔
951
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
952
    domains = []
1✔
953

954
    def factory(**kwargs) -> str:
1✔
955
        if "DomainName" not in kwargs:
1✔
956
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
957

958
        aws_client.opensearch.create_domain(**kwargs)
1✔
959

960
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
961

962
        domains.append(kwargs["DomainName"])
1✔
963
        return kwargs["DomainName"]
1✔
964

965
    yield factory
1✔
966

967
    # cleanup
968
    for domain in domains:
1✔
969
        try:
1✔
970
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
971
        except Exception as e:
×
972
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
973

974

975
@pytest.fixture
1✔
976
def opensearch_domain(opensearch_create_domain) -> str:
1✔
977
    return opensearch_create_domain()
1✔
978

979

980
@pytest.fixture
1✔
981
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
982
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
983
    assert "Endpoint" in status
1✔
984
    return f"https://{status['Endpoint']}"
1✔
985

986

987
@pytest.fixture
1✔
988
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
989
    document = {
1✔
990
        "first_name": "Boba",
991
        "last_name": "Fett",
992
        "age": 41,
993
        "about": "I'm just a simple man, trying to make my way in the universe.",
994
        "interests": ["mandalorian armor", "tusken culture"],
995
    }
996
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
997
    response = requests.put(
1✔
998
        document_path,
999
        data=json.dumps(document),
1000
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
1001
    )
1002
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1003
    return document_path
1✔
1004

1005

1006
# Cleanup fixtures
1007
@pytest.fixture
1✔
1008
def cleanup_stacks(aws_client):
1✔
1009
    def _cleanup_stacks(stacks: list[str]) -> None:
1✔
1010
        stacks = ensure_list(stacks)
1✔
1011
        for stack in stacks:
1✔
1012
            try:
1✔
1013
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1014
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1015
            except Exception:
×
1016
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1017

1018
    return _cleanup_stacks
1✔
1019

1020

1021
@pytest.fixture
1✔
1022
def cleanup_changesets(aws_client):
1✔
1023
    def _cleanup_changesets(changesets: list[str]) -> None:
1✔
1024
        changesets = ensure_list(changesets)
1✔
1025
        for cs in changesets:
1✔
1026
            try:
1✔
1027
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1028
            except Exception:
1✔
1029
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1030

1031
    return _cleanup_changesets
1✔
1032

1033

1034
# Helpers for Cfn
1035

1036

1037
# TODO: exports(!)
1038
@dataclasses.dataclass(frozen=True)
1✔
1039
class DeployResult:
1✔
1040
    change_set_id: str
1✔
1041
    stack_id: str
1✔
1042
    stack_name: str
1✔
1043
    change_set_name: str
1✔
1044
    outputs: dict[str, str]
1✔
1045

1046
    destroy: Callable[[], None]
1✔
1047

1048

1049
class StackDeployError(Exception):
1✔
1050
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1051
        self.describe_result = describe_res
1✔
1052
        self.events = events
1✔
1053

1054
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1055
        if config.CFN_VERBOSE_ERRORS:
1✔
1056
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1057
        else:
1058
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1059

1060
        super().__init__(msg)
1✔
1061

1062
    def format_events(self, events: list[dict]) -> str:
1✔
1063
        formatted_events = []
1✔
1064

1065
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1066
        for event in chronological_events:
1✔
1067
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1068
                formatted_events.append(self.format_event(event))
1✔
1069

1070
        return "\n".join(formatted_events)
1✔
1071

1072
    @staticmethod
1✔
1073
    def format_event(event: dict) -> str:
1✔
1074
        if reason := event.get("ResourceStatusReason"):
1✔
1075
            reason = reason.replace("\n", "; ")
1✔
1076
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1077
        else:
1078
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
1✔
1079

1080

1081
@pytest.fixture
1✔
1082
def deploy_cfn_template(
1✔
1083
    aws_client: ServiceLevelClientFactory,
1084
):
1085
    state: list[tuple[str, Callable]] = []
1✔
1086

1087
    def _deploy(
1✔
1088
        *,
1089
        is_update: bool | None = False,
1090
        stack_name: str | None = None,
1091
        change_set_name: str | None = None,
1092
        template: str | None = None,
1093
        template_path: str | os.PathLike | None = None,
1094
        template_mapping: dict[str, Any] | None = None,
1095
        parameters: dict[str, str] | None = None,
1096
        role_arn: str | None = None,
1097
        max_wait: int | None = None,
1098
        delay_between_polls: int | None = 2,
1099
        custom_aws_client: ServiceLevelClientFactory | None = None,
1100
        raw_parameters: list[Parameter] | None = None,
1101
    ) -> DeployResult:
1102
        if is_update:
1✔
1103
            assert stack_name
1✔
1104
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1105
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1106

1107
        if max_wait is None:
1✔
1108
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1109

1110
        if template_path is not None:
1✔
1111
            template = load_template_file(template_path)
1✔
1112
            if template is None:
1✔
1113
                raise RuntimeError(f"Could not find file {os.path.realpath(template_path)}")
×
1114
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1115

1116
        kwargs = CreateChangeSetInput(
1✔
1117
            StackName=stack_name,
1118
            ChangeSetName=change_set_name,
1119
            TemplateBody=template_rendered,
1120
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1121
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1122
        )
1123
        kwargs["Parameters"] = []
1✔
1124
        if parameters:
1✔
1125
            kwargs["Parameters"] = [
1✔
1126
                Parameter(ParameterKey=k, ParameterValue=v) for (k, v) in parameters.items()
1127
            ]
1128
        elif raw_parameters:
1✔
1129
            kwargs["Parameters"] = raw_parameters
×
1130

1131
        if role_arn is not None:
1✔
1132
            kwargs["RoleARN"] = role_arn
×
1133

1134
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1135

1136
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1137

1138
        change_set_id = response["Id"]
1✔
1139
        stack_id = response["StackId"]
1✔
1140

1141
        try:
1✔
1142
            cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1143
                ChangeSetName=change_set_id
1144
            )
1145
        except botocore.exceptions.WaiterError as e:
×
1146
            change_set = cfn_aws_client.cloudformation.describe_change_set(
×
1147
                ChangeSetName=change_set_id
1148
            )
1149
            raise Exception(f"{change_set['Status']}: {change_set.get('StatusReason')}") from e
×
1150

1151
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1152
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1153
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1154
        )
1155

1156
        try:
1✔
1157
            stack_waiter.wait(
1✔
1158
                StackName=stack_id,
1159
                WaiterConfig={
1160
                    "Delay": delay_between_polls,
1161
                    "MaxAttempts": max_wait / delay_between_polls,
1162
                },
1163
            )
1164
        except botocore.exceptions.WaiterError as e:
1✔
1165
            raise StackDeployError(
1✔
1166
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1167
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1168
                    "StackEvents"
1169
                ],
1170
            ) from e
1171

1172
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1173
            "Stacks"
1174
        ][0]
1175
        outputs = describe_stack_res.get("Outputs", [])
1✔
1176

1177
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1178

1179
        def _destroy_stack():
1✔
1180
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1181
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1182
                StackName=stack_id,
1183
                WaiterConfig={
1184
                    "Delay": delay_between_polls,
1185
                    "MaxAttempts": max_wait / delay_between_polls,
1186
                },
1187
            )
1188

1189
        state.append((stack_id, _destroy_stack))
1✔
1190

1191
        return DeployResult(
1✔
1192
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1193
        )
1194

1195
    yield _deploy
1✔
1196

1197
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1198
    for stack_id, teardown in state[::-1]:
1✔
1199
        try:
1✔
1200
            teardown()
1✔
1201
        except Exception as e:
1✔
1202
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1203

1204

1205
@pytest.fixture
1✔
1206
def is_change_set_created_and_available(aws_client):
1✔
1207
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1208
        def _inner():
1✔
1209
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1210
            return (
1✔
1211
                # TODO: CREATE_FAILED should also not lead to further retries
1212
                change_set.get("Status") == "CREATE_COMPLETE"
1213
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1214
            )
1215

1216
        return _inner
1✔
1217

1218
    return _is_change_set_created_and_available
1✔
1219

1220

1221
@pytest.fixture
1✔
1222
def is_change_set_failed_and_unavailable(aws_client):
1✔
1223
    def _is_change_set_created_and_available(change_set_id: str):
×
1224
        def _inner():
×
1225
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1226
            return (
×
1227
                # TODO: CREATE_FAILED should also not lead to further retries
1228
                change_set.get("Status") == "FAILED"
1229
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1230
            )
1231

1232
        return _inner
×
1233

1234
    return _is_change_set_created_and_available
×
1235

1236

1237
@pytest.fixture
1✔
1238
def is_stack_created(aws_client):
1✔
1239
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1240

1241

1242
@pytest.fixture
1✔
1243
def is_stack_updated(aws_client):
1✔
1244
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1245

1246

1247
@pytest.fixture
1✔
1248
def is_stack_deleted(aws_client):
1✔
1249
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1250

1251

1252
def _has_stack_status(cfn_client, statuses: list[str]):
1✔
1253
    def _has_status(stack_id: str):
1✔
1254
        def _inner():
1✔
1255
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1256
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1257
            return s.get("StackStatus") in statuses
1✔
1258

1259
        return _inner
1✔
1260

1261
    return _has_status
1✔
1262

1263

1264
@pytest.fixture
1✔
1265
def is_change_set_finished(aws_client):
1✔
1266
    def _is_change_set_finished(change_set_id: str, stack_name: str | None = None):
1✔
1267
        def _inner():
×
1268
            kwargs = {"ChangeSetName": change_set_id}
×
1269
            if stack_name:
×
1270
                kwargs["StackName"] = stack_name
×
1271

1272
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
×
1273

1274
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
×
1275
                LOG.warning("Change set failed")
×
1276
                raise ShortCircuitWaitException()
×
1277

1278
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
×
1279

1280
        return _inner
×
1281

1282
    return _is_change_set_finished
1✔
1283

1284

1285
@pytest.fixture
1✔
1286
def wait_until_lambda_ready(aws_client):
1✔
1287
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1288
        client = client or aws_client.lambda_
1✔
1289

1290
        def _is_not_pending():
1✔
1291
            kwargs = {}
1✔
1292
            if qualifier:
1✔
1293
                kwargs["Qualifier"] = qualifier
×
1294
            try:
1✔
1295
                result = (
1✔
1296
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1297
                    != "Pending"
1298
                )
1299
                LOG.debug("lambda state result: result=%s", result)
1✔
1300
                return result
1✔
1301
            except Exception as e:
×
1302
                LOG.error(e)
×
1303
                raise
×
1304

1305
        wait_until(_is_not_pending)
1✔
1306

1307
    return _wait_until_ready
1✔
1308

1309

1310
role_assume_policy = """
1✔
1311
{
1312
  "Version": "2012-10-17",
1313
  "Statement": [
1314
    {
1315
      "Effect": "Allow",
1316
      "Principal": {
1317
        "Service": "lambda.amazonaws.com"
1318
      },
1319
      "Action": "sts:AssumeRole"
1320
    }
1321
  ]
1322
}
1323
""".strip()
1324

1325
role_policy = """
1✔
1326
{
1327
    "Version": "2012-10-17",
1328
    "Statement": [
1329
        {
1330
            "Effect": "Allow",
1331
            "Action": [
1332
                "logs:CreateLogGroup",
1333
                "logs:CreateLogStream",
1334
                "logs:PutLogEvents"
1335
            ],
1336
            "Resource": [
1337
                "*"
1338
            ]
1339
        }
1340
    ]
1341
}
1342
""".strip()
1343

1344

1345
@pytest.fixture
1✔
1346
def create_lambda_function_aws(aws_client):
1✔
1347
    lambda_arns = []
1✔
1348

1349
    def _create_lambda_function(**kwargs):
1✔
1350
        def _create_function():
1✔
1351
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1352
            lambda_arns.append(resp["FunctionArn"])
1✔
1353

1354
            def _is_not_pending():
1✔
1355
                try:
1✔
1356
                    result = (
1✔
1357
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1358
                            "Configuration"
1359
                        ]["State"]
1360
                        != "Pending"
1361
                    )
1362
                    return result
1✔
1363
                except Exception as e:
×
1364
                    LOG.error(e)
×
1365
                    raise
×
1366

1367
            wait_until(_is_not_pending)
1✔
1368
            return resp
1✔
1369

1370
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1371
        # localstack should normally not require the retries and will just continue here
1372
        return retry(_create_function, retries=3, sleep=4)
1✔
1373

1374
    yield _create_lambda_function
1✔
1375

1376
    for arn in lambda_arns:
1✔
1377
        try:
1✔
1378
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1379
        except Exception:
×
1380
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1381

1382

1383
@pytest.fixture
1✔
1384
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1385
    lambda_arns_and_clients = []
1✔
1386
    log_groups = []
1✔
1387
    lambda_client = aws_client.lambda_
1✔
1388
    logs_client = aws_client.logs
1✔
1389
    s3_client = aws_client.s3
1✔
1390

1391
    def _create_lambda_function(*args, **kwargs):
1✔
1392
        client = kwargs.get("client") or lambda_client
1✔
1393
        kwargs["client"] = client
1✔
1394
        kwargs["s3_client"] = s3_client
1✔
1395
        func_name = kwargs.get("func_name")
1✔
1396
        assert func_name
1✔
1397
        del kwargs["func_name"]
1✔
1398

1399
        if not kwargs.get("role"):
1✔
1400
            kwargs["role"] = lambda_su_role
1✔
1401

1402
        def _create_function():
1✔
1403
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1404
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1405
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1406
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1407
            log_groups.append(log_group_name)
1✔
1408
            return resp
1✔
1409

1410
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1411
        # localstack should normally not require the retries and will just continue here
1412
        return retry(_create_function, retries=3, sleep=4)
1✔
1413

1414
    yield _create_lambda_function
1✔
1415

1416
    for arn, client in lambda_arns_and_clients:
1✔
1417
        try:
1✔
1418
            client.delete_function(FunctionName=arn)
1✔
1419
        except Exception:
1✔
1420
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1421

1422
    for log_group_name in log_groups:
1✔
1423
        try:
1✔
1424
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1425
        except Exception:
1✔
1426
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1427

1428

1429
@pytest.fixture
1✔
1430
def lambda_is_function_deleted(aws_client):
1✔
1431
    """Example usage:
1432
    wait_until(lambda_is_function_deleted(function_name))
1433
    wait_until(lambda_is_function_deleted(function_name, Qualifier="my-alias"))
1434

1435
    function_name can be a function name, function ARN, or partial function ARN.
1436
    """
1437
    return _lambda_is_function_deleted(aws_client.lambda_)
×
1438

1439

1440
def _lambda_is_function_deleted(lambda_client):
1✔
1441
    def _is_function_deleted(
×
1442
        function_name: str,
1443
        **kwargs,
1444
    ) -> Callable[[], bool]:
1445
        def _inner() -> bool:
×
1446
            try:
×
1447
                lambda_client.get_function(FunctionName=function_name, **kwargs)
×
1448
                return False
×
1449
            except lambda_client.exceptions.ResourceNotFoundException:
×
1450
                return True
×
1451

1452
        return _inner
×
1453

1454
    return _is_function_deleted
×
1455

1456

1457
@pytest.fixture
1✔
1458
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1459
    from localstack.aws.api.lambda_ import Runtime
1✔
1460

1461
    lambda_client = aws_client.lambda_
1✔
1462
    handler_code = textwrap.dedent(
1✔
1463
        """
1464
    import json
1465
    import os
1466

1467

1468
    def make_response(body: dict, status_code: int = 200):
1469
        return {
1470
            "statusCode": status_code,
1471
            "headers": {"Content-Type": "application/json"},
1472
            "body": body,
1473
        }
1474

1475

1476
    def trim_headers(headers):
1477
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1478
            return headers
1479
        return {
1480
            key: value for key, value in headers.items()
1481
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1482
        }
1483

1484

1485
    def handler(event, context):
1486
        print(json.dumps(event))
1487
        response = {
1488
            "args": event.get("queryStringParameters", {}),
1489
            "data": event.get("body", ""),
1490
            "domain": event["requestContext"].get("domainName", ""),
1491
            "headers": trim_headers(event.get("headers", {})),
1492
            "method": event["requestContext"]["http"].get("method", ""),
1493
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1494
            "path": event["requestContext"]["http"].get("path", ""),
1495
        }
1496
        return make_response(response)"""
1497
    )
1498

1499
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1500
        """Creates a server that will echo any request. Any request will be returned with the
1501
        following format. Any unset values will have those defaults.
1502
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1503
        order to create easier Snapshot testing. Default: `False`
1504
        {
1505
          "args": {},
1506
          "headers": {},
1507
          "data": "",
1508
          "method": "",
1509
          "domain": "",
1510
          "origin": "",
1511
          "path": ""
1512
        }"""
1513
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1514
        func_name = f"echo-http-{short_uid()}"
1✔
1515
        create_lambda_function(
1✔
1516
            func_name=func_name,
1517
            zip_file=zip_file,
1518
            runtime=Runtime.python3_12,
1519
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1520
        )
1521
        url_response = lambda_client.create_function_url_config(
1✔
1522
            FunctionName=func_name, AuthType="NONE"
1523
        )
1524
        aws_client.lambda_.add_permission(
1✔
1525
            FunctionName=func_name,
1526
            StatementId="urlPermission",
1527
            Action="lambda:InvokeFunctionUrl",
1528
            Principal="*",
1529
            FunctionUrlAuthType="NONE",
1530
        )
1531
        return url_response["FunctionUrl"]
1✔
1532

1533
    yield _create_echo_http_server
1✔
1534

1535

1536
@pytest.fixture
1✔
1537
def create_event_source_mapping(aws_client):
1✔
1538
    uuids = []
1✔
1539

1540
    def _create_event_source_mapping(*args, **kwargs):
1✔
1541
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1542
        uuids.append(response["UUID"])
1✔
1543
        return response
1✔
1544

1545
    yield _create_event_source_mapping
1✔
1546

1547
    for uuid in uuids:
1✔
1548
        try:
1✔
1549
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1550
        except aws_client.lambda_.exceptions.ResourceNotFoundException:
1✔
1551
            pass
1✔
1552
        except Exception as ex:
×
1553
            LOG.debug("Unable to delete event source mapping %s in cleanup: %s", uuid, ex)
×
1554

1555

1556
@pytest.fixture
1✔
1557
def check_lambda_logs(aws_client):
1✔
1558
    def _check_logs(func_name: str, expected_lines: list[str] = None) -> list[str]:
1✔
1559
        if not expected_lines:
1✔
1560
            expected_lines = []
×
1561
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1562
        log_messages = [e["message"] for e in log_events]
1✔
1563
        for line in expected_lines:
1✔
1564
            if ".*" in line:
1✔
1565
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1566
                if any(found):
1✔
1567
                    continue
1✔
1568
            assert line in log_messages
×
1569
        return log_messages
1✔
1570

1571
    return _check_logs
1✔
1572

1573

1574
@pytest.fixture
1✔
1575
def create_policy(aws_client):
1✔
1576
    policy_arns = []
1✔
1577

1578
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1579
        iam_client = iam_client or aws_client.iam
1✔
1580
        if "PolicyName" not in kwargs:
1✔
1581
            kwargs["PolicyName"] = f"policy-{short_uid()}"
1✔
1582
        response = iam_client.create_policy(*args, **kwargs)
1✔
1583
        policy_arn = response["Policy"]["Arn"]
1✔
1584
        policy_arns.append((policy_arn, iam_client))
1✔
1585
        return response
1✔
1586

1587
    yield _create_policy
1✔
1588

1589
    for policy_arn, iam_client in policy_arns:
1✔
1590
        try:
1✔
1591
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1592
        except Exception:
1✔
1593
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1594

1595

1596
@pytest.fixture
1✔
1597
def create_user(aws_client):
1✔
1598
    usernames = []
1✔
1599

1600
    def _create_user(**kwargs):
1✔
1601
        if "UserName" not in kwargs:
1✔
1602
            kwargs["UserName"] = f"user-{short_uid()}"
×
1603
        response = aws_client.iam.create_user(**kwargs)
1✔
1604
        usernames.append(response["User"]["UserName"])
1✔
1605
        return response
1✔
1606

1607
    yield _create_user
1✔
1608

1609
    for username in usernames:
1✔
1610
        try:
1✔
1611
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1612
        except ClientError as e:
1✔
1613
            LOG.debug(
1✔
1614
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1615
            )
1616
            continue
1✔
1617

1618
        for inline_policy in inline_policies:
1✔
1619
            try:
1✔
1620
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1621
            except Exception:
×
1622
                LOG.debug(
×
1623
                    "Could not delete user policy '%s' from '%s' during cleanup",
1624
                    inline_policy,
1625
                    username,
1626
                )
1627
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1628
            "AttachedPolicies"
1629
        ]
1630
        for attached_policy in attached_policies:
1✔
1631
            try:
1✔
1632
                aws_client.iam.detach_user_policy(
1✔
1633
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1634
                )
1635
            except Exception:
1✔
1636
                LOG.debug(
1✔
1637
                    "Error detaching policy '%s' from user '%s'",
1638
                    attached_policy["PolicyArn"],
1639
                    username,
1640
                )
1641
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1642
        for access_key in access_keys:
1✔
1643
            try:
1✔
1644
                aws_client.iam.delete_access_key(
1✔
1645
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1646
                )
1647
            except Exception:
×
1648
                LOG.debug(
×
1649
                    "Error deleting access key '%s' from user '%s'",
1650
                    access_key["AccessKeyId"],
1651
                    username,
1652
                )
1653

1654
        try:
1✔
1655
            aws_client.iam.delete_user(UserName=username)
1✔
1656
        except Exception as e:
1✔
1657
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1658

1659

1660
@pytest.fixture
1✔
1661
def wait_and_assume_role(aws_client):
1✔
1662
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1663
        if not session_name:
1✔
1664
            session_name = f"session-{short_uid()}"
1✔
1665

1666
        def assume_role():
1✔
1667
            return aws_client.sts.assume_role(
1✔
1668
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1669
            )["Credentials"]
1670

1671
        # need to retry a couple of times before we are allowed to assume this role in AWS
1672
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1673
        return keys
1✔
1674

1675
    return _wait_and_assume_role
1✔
1676

1677

1678
@pytest.fixture
1✔
1679
def create_role(aws_client):
1✔
1680
    role_names = []
1✔
1681

1682
    def _create_role(iam_client=None, **kwargs):
1✔
1683
        if not kwargs.get("RoleName"):
1✔
1684
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1685
        iam_client = iam_client or aws_client.iam
1✔
1686
        result = iam_client.create_role(**kwargs)
1✔
1687
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1688
        return result
1✔
1689

1690
    yield _create_role
1✔
1691

1692
    for role_name, iam_client in role_names:
1✔
1693
        # detach policies
1694
        try:
1✔
1695
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1696
                "AttachedPolicies"
1697
            ]
1698
        except ClientError as e:
1✔
1699
            LOG.debug(
1✔
1700
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1701
                e,
1702
                role_name,
1703
            )
1704
            continue
1✔
1705
        for attached_policy in attached_policies:
1✔
1706
            try:
1✔
1707
                iam_client.detach_role_policy(
1✔
1708
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1709
                )
1710
            except Exception:
×
1711
                LOG.debug(
×
1712
                    "Could not detach role policy '%s' from '%s' during cleanup",
1713
                    attached_policy["PolicyArn"],
1714
                    role_name,
1715
                )
1716
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1717
        for role_policy in role_policies:
1✔
1718
            try:
1✔
1719
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1720
            except Exception:
×
1721
                LOG.debug(
×
1722
                    "Could not delete role policy '%s' from '%s' during cleanup",
1723
                    role_policy,
1724
                    role_name,
1725
                )
1726
        try:
1✔
1727
            iam_client.delete_role(RoleName=role_name)
1✔
1728
        except Exception:
×
1729
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1730

1731

1732
@pytest.fixture
1✔
1733
def create_parameter(aws_client):
1✔
1734
    params = []
1✔
1735

1736
    def _create_parameter(**kwargs):
1✔
1737
        params.append(kwargs["Name"])
1✔
1738
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1739

1740
    yield _create_parameter
1✔
1741

1742
    for param in params:
1✔
1743
        aws_client.ssm.delete_parameter(Name=param)
1✔
1744

1745

1746
@pytest.fixture
1✔
1747
def create_secret(aws_client):
1✔
1748
    items = []
1✔
1749

1750
    def _create_parameter(**kwargs):
1✔
1751
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1752
        items.append(create_response["ARN"])
1✔
1753
        return create_response
1✔
1754

1755
    yield _create_parameter
1✔
1756

1757
    for item in items:
1✔
1758
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1759

1760

1761
# TODO Figure out how to make cert creation tests pass against AWS.
1762
#
1763
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1764
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1765
#
1766
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1767
# by adding some CNAME records as requested by ASW in response to a certificate request.
1768
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1769
#
1770
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1771
# created by openssl etc. Not sure if there are other issues after that.
1772
#
1773
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1774
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1775
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1776
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1777
# pre-created certificates we would have to use specific domain names instead of random ones.
1778
@pytest.fixture
1✔
1779
def acm_request_certificate(aws_client_factory):
1✔
1780
    certificate_arns = []
1✔
1781

1782
    def factory(**kwargs) -> str:
1✔
1783
        if "DomainName" not in kwargs:
1✔
1784
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1785

1786
        region_name = kwargs.pop("region_name", None)
1✔
1787
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1788

1789
        response = acm_client.request_certificate(**kwargs)
1✔
1790
        created_certificate_arn = response["CertificateArn"]
1✔
1791
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1792
        return response
1✔
1793

1794
    yield factory
1✔
1795

1796
    # cleanup
1797
    for certificate_arn, region_name in certificate_arns:
1✔
1798
        try:
1✔
1799
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1800
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1801
        except Exception as e:
×
1802
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1803

1804

1805
role_policy_su = {
1✔
1806
    "Version": "2012-10-17",
1807
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1808
}
1809

1810

1811
@pytest.fixture(scope="session")
1✔
1812
def lambda_su_role(aws_client):
1✔
1813
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1814
    role = aws_client.iam.create_role(
1✔
1815
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1816
    )["Role"]
1817
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1818
    policy_arn = aws_client.iam.create_policy(
1✔
1819
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1820
    )["Policy"]["Arn"]
1821
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1822

1823
    if is_aws_cloud():  # dirty but necessary
1✔
1824
        time.sleep(10)
×
1825

1826
    yield role["Arn"]
1✔
1827

1828
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1829
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1830
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1831

1832

1833
@pytest.fixture
1✔
1834
def create_iam_role_and_attach_policy(aws_client):
1✔
1835
    """
1836
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1837

1838
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1839
    """
1840
    roles = []
×
1841

1842
    def _inner(**kwargs: dict[str, any]) -> str:
×
1843
        """
1844
        :param dict RoleDefinition: role definition document
1845
        :param str PolicyArn: policy ARN
1846
        :param str RoleName: role name (autogenerated if omitted)
1847
        :return: role ARN
1848
        """
1849
        if "RoleName" not in kwargs:
×
1850
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1851

1852
        role = kwargs["RoleName"]
×
1853
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1854

1855
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1856
        role_arn = result["Role"]["Arn"]
×
1857

1858
        policy_arn = kwargs["PolicyArn"]
×
1859
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1860

1861
        roles.append(role)
×
1862
        return role_arn
×
1863

1864
    yield _inner
×
1865

1866
    for role in roles:
×
1867
        try:
×
1868
            aws_client.iam.delete_role(RoleName=role)
×
1869
        except Exception as exc:
×
1870
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1871

1872

1873
@pytest.fixture
1✔
1874
def create_iam_role_with_policy(aws_client):
1✔
1875
    """
1876
    Fixture that creates an IAM role with given role definition and policy definition.
1877
    """
1878
    roles = {}
1✔
1879

1880
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1881
        """
1882
        :param dict RoleDefinition: role definition document
1883
        :param dict PolicyDefinition: policy definition document
1884
        :param str PolicyName: policy name (autogenerated if omitted)
1885
        :param str RoleName: role name (autogenerated if omitted)
1886
        :return: role ARN
1887
        """
1888
        if "RoleName" not in kwargs:
1✔
1889
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1890
        role = kwargs["RoleName"]
1✔
1891
        if "PolicyName" not in kwargs:
1✔
1892
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1893
        policy = kwargs["PolicyName"]
1✔
1894
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1895

1896
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1897
        role_arn = result["Role"]["Arn"]
1✔
1898

1899
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1900
        aws_client.iam.put_role_policy(
1✔
1901
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1902
        )
1903
        roles[role] = policy
1✔
1904
        return role_arn
1✔
1905

1906
    yield _create_role_and_policy
1✔
1907

1908
    for role_name, policy_name in roles.items():
1✔
1909
        try:
1✔
1910
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1911
        except Exception as exc:
×
1912
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1913
        try:
1✔
1914
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1915
        except Exception as exc:
×
1916
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1917

1918

1919
@pytest.fixture
1✔
1920
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1921
    delivery_stream_names = []
1✔
1922

1923
    def _create_delivery_stream(**kwargs):
1✔
1924
        if "DeliveryStreamName" not in kwargs:
1✔
1925
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1926
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1927
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1928
        delivery_stream_names.append(delivery_stream_name)
1✔
1929
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1930
        return response
1✔
1931

1932
    yield _create_delivery_stream
1✔
1933

1934
    for delivery_stream_name in delivery_stream_names:
1✔
1935
        try:
1✔
1936
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1937
        except Exception:
×
1938
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1939

1940

1941
@pytest.fixture
1✔
1942
def ses_configuration_set(aws_client):
1✔
1943
    configuration_set_names = []
1✔
1944

1945
    def factory(name: str) -> None:
1✔
1946
        aws_client.ses.create_configuration_set(
1✔
1947
            ConfigurationSet={
1948
                "Name": name,
1949
            },
1950
        )
1951
        configuration_set_names.append(name)
1✔
1952

1953
    yield factory
1✔
1954

1955
    for configuration_set_name in configuration_set_names:
1✔
1956
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1957

1958

1959
@pytest.fixture
1✔
1960
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1961
    event_destinations = []
1✔
1962

1963
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1964
        aws_client.ses.create_configuration_set_event_destination(
1✔
1965
            ConfigurationSetName=config_set_name,
1966
            EventDestination={
1967
                "Name": event_destination_name,
1968
                "Enabled": True,
1969
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1970
                "SNSDestination": {
1971
                    "TopicARN": topic_arn,
1972
                },
1973
            },
1974
        )
1975
        event_destinations.append((config_set_name, event_destination_name))
1✔
1976

1977
    yield factory
1✔
1978

1979
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1980
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1981
            ConfigurationSetName=created_config_set_name,
1982
            EventDestinationName=created_event_destination_name,
1983
        )
1984

1985

1986
@pytest.fixture
1✔
1987
def ses_email_template(aws_client):
1✔
1988
    template_names = []
1✔
1989

1990
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1991
        aws_client.ses.create_template(
1✔
1992
            Template={
1993
                "TemplateName": name,
1994
                "SubjectPart": subject,
1995
                "TextPart": contents,
1996
            }
1997
        )
1998
        template_names.append(name)
1✔
1999

2000
    yield factory
1✔
2001

2002
    for template_name in template_names:
1✔
2003
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
2004

2005

2006
@pytest.fixture
1✔
2007
def ses_verify_identity(aws_client):
1✔
2008
    identities = []
1✔
2009

2010
    def factory(email_address: str) -> None:
1✔
2011
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
2012

2013
    yield factory
1✔
2014

2015
    for identity in identities:
1✔
2016
        aws_client.ses.delete_identity(Identity=identity)
×
2017

2018

2019
@pytest.fixture
1✔
2020
def setup_sender_email_address(ses_verify_identity):
1✔
2021
    """
2022
    If the test is running against AWS then assume the email address passed is already
2023
    verified, and passes the given email address through. Otherwise, it generates one random
2024
    email address and verify them.
2025
    """
2026

2027
    def inner(sender_email_address: str | None = None) -> str:
1✔
2028
        if is_aws_cloud():
1✔
2029
            if sender_email_address is None:
×
2030
                raise ValueError(
×
2031
                    "sender_email_address must be specified to run this test against AWS"
2032
                )
2033
        else:
2034
            # overwrite the given parameters with localstack specific ones
2035
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
2036
            ses_verify_identity(sender_email_address)
1✔
2037

2038
        return sender_email_address
1✔
2039

2040
    return inner
1✔
2041

2042

2043
@pytest.fixture
1✔
2044
def ec2_create_security_group(aws_client):
1✔
2045
    ec2_sgs = []
1✔
2046

2047
    def factory(ports=None, ip_protocol: str = "tcp", **kwargs):
1✔
2048
        """
2049
        Create the target group and authorize the security group ingress.
2050
        :param ports: list of ports to be authorized for the ingress rule.
2051
        :param ip_protocol: the ip protocol for the permissions (tcp by default)
2052
        """
2053
        if "GroupName" not in kwargs:
1✔
2054
            # FIXME: This will fail against AWS since the sg prefix is not valid for GroupName
2055
            # > "Group names may not be in the format sg-*".
2056
            kwargs["GroupName"] = f"sg-{short_uid()}"
×
2057
        # Making sure the call to CreateSecurityGroup gets the right arguments
2058
        _args = select_from_typed_dict(CreateSecurityGroupRequest, kwargs)
1✔
2059
        security_group = aws_client.ec2.create_security_group(**_args)
1✔
2060
        security_group_id = security_group["GroupId"]
1✔
2061

2062
        # FIXME: If 'ports' is None or an empty list, authorize_security_group_ingress will fail due to missing IpPermissions.
2063
        # Must ensure ports are explicitly provided or skip authorization entirely if not required.
2064
        permissions = [
1✔
2065
            {
2066
                "FromPort": port,
2067
                "IpProtocol": ip_protocol,
2068
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2069
                "ToPort": port,
2070
            }
2071
            for port in ports or []
2072
        ]
2073
        aws_client.ec2.authorize_security_group_ingress(
1✔
2074
            GroupId=security_group_id,
2075
            IpPermissions=permissions,
2076
        )
2077

2078
        ec2_sgs.append(security_group_id)
1✔
2079
        return security_group
1✔
2080

2081
    yield factory
1✔
2082

2083
    for sg_group_id in ec2_sgs:
1✔
2084
        try:
1✔
2085
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2086
        except Exception as e:
×
2087
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2088

2089

2090
@pytest.fixture
1✔
2091
def ec2_create_vpc_endpoint(aws_client):
1✔
2092
    vpc_endpoints = []
1✔
2093

2094
    def _create(**kwargs: Unpack[CreateVpcEndpointRequest]) -> VpcEndpoint:
1✔
2095
        endpoint = aws_client.ec2.create_vpc_endpoint(**kwargs)
1✔
2096
        endpoint_id = endpoint["VpcEndpoint"]["VpcEndpointId"]
1✔
2097
        vpc_endpoints.append(endpoint_id)
1✔
2098

2099
        def _check_available() -> VpcEndpoint:
1✔
2100
            result = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=[endpoint_id])
1✔
2101
            _endpoint_details = result["VpcEndpoints"][0]
1✔
2102
            assert _endpoint_details["State"] == "available"
1✔
2103

2104
            return _endpoint_details
1✔
2105

2106
        return retry(_check_available, retries=30, sleep=5 if is_aws_cloud() else 1)
1✔
2107

2108
    yield _create
1✔
2109

2110
    try:
1✔
2111
        aws_client.ec2.delete_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
1✔
2112
    except Exception as e:
×
2113
        LOG.error("Error cleaning up VPC endpoint: %s, %s", vpc_endpoints, e)
×
2114

2115
    def wait_for_endpoint_deleted():
1✔
2116
        try:
×
2117
            endpoints = aws_client.ec2.describe_vpc_endpoints(VpcEndpointIds=vpc_endpoints)
×
2118
            assert len(endpoints["VpcEndpoints"]) == 0 or all(
×
2119
                endpoint["State"] == "Deleted" for endpoint in endpoints["VpcEndpoints"]
2120
            )
2121
        except botocore.exceptions.ClientError:
×
2122
            pass
×
2123

2124
    # the vpc can't be deleted if an endpoint exists
2125
    if is_aws_cloud():
1✔
2126
        retry(wait_for_endpoint_deleted, retries=30, sleep=10 if is_aws_cloud() else 1)
×
2127

2128

2129
@pytest.fixture
1✔
2130
def cleanups():
1✔
2131
    cleanup_fns = []
1✔
2132

2133
    yield cleanup_fns
1✔
2134

2135
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2136
        try:
1✔
2137
            cleanup_callback()
1✔
2138
        except Exception as e:
1✔
2139
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2140

2141

2142
@pytest.fixture(scope="session")
1✔
2143
def account_id(aws_client):
1✔
2144
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2145
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2146
    else:
2147
        return TEST_AWS_ACCOUNT_ID
×
2148

2149

2150
@pytest.fixture(scope="session")
1✔
2151
def region_name(aws_client):
1✔
2152
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2153
        return aws_client.sts.meta.region_name
1✔
2154
    else:
2155
        return TEST_AWS_REGION_NAME
×
2156

2157

2158
@pytest.fixture(scope="session")
1✔
2159
def partition(region_name):
1✔
2160
    return get_partition(region_name)
1✔
2161

2162

2163
@pytest.fixture(scope="session")
1✔
2164
def secondary_account_id(secondary_aws_client):
1✔
2165
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2166
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2167
    else:
2168
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2169

2170

2171
@pytest.fixture(scope="session")
1✔
2172
def secondary_region_name():
1✔
2173
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2174

2175

2176
@pytest.hookimpl
1✔
2177
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2178
    only_localstack = pytest.mark.skipif(
1✔
2179
        is_aws_cloud(),
2180
        reason="test only applicable if run against localstack",
2181
    )
2182
    for item in items:
1✔
2183
        for mark in item.iter_markers():
1✔
2184
            if mark.name.endswith("only_localstack"):
1✔
2185
                item.add_marker(only_localstack)
1✔
2186
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2187
            # add a marker that indicates that this test is snapshot validated
2188
            # if it uses the snapshot fixture -> allows selecting only snapshot
2189
            # validated tests in order to capture new snapshots for a whole
2190
            # test file with "-m snapshot_validated"
2191
            item.add_marker("snapshot_validated")
1✔
2192

2193

2194
@pytest.fixture
1✔
2195
def sample_stores() -> AccountRegionBundle:
1✔
2196
    class SampleStore(BaseStore):
1✔
2197
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2198
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2199
        region_specific_attr = LocalAttribute(default=list)
1✔
2200

2201
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2202

2203

2204
@pytest.fixture
1✔
2205
def create_rest_apigw(aws_client_factory):
1✔
2206
    rest_apis = []
1✔
2207
    retry_boto_config = None
1✔
2208
    if is_aws_cloud():
1✔
2209
        retry_boto_config = botocore.config.Config(
×
2210
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2211
            retries={"max_attempts": 10, "mode": "adaptive"}
2212
        )
2213

2214
    def _create_apigateway_function(**kwargs):
1✔
2215
        client_region_name = kwargs.pop("region_name", None)
1✔
2216
        apigateway_client = aws_client_factory(
1✔
2217
            region_name=client_region_name, config=retry_boto_config
2218
        ).apigateway
2219
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2220

2221
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2222
        api_id = response.get("id")
1✔
2223
        rest_apis.append((api_id, client_region_name))
1✔
2224

2225
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2226

2227
    yield _create_apigateway_function
1✔
2228

2229
    for rest_api_id, _client_region_name in rest_apis:
1✔
2230
        apigateway_client = aws_client_factory(
1✔
2231
            region_name=_client_region_name,
2232
            config=retry_boto_config,
2233
        ).apigateway
2234
        # First, retrieve the usage plans associated with the REST API
2235
        usage_plan_ids = []
1✔
2236
        usage_plans = apigateway_client.get_usage_plans()
1✔
2237
        for item in usage_plans.get("items", []):
1✔
2238
            api_stages = item.get("apiStages", [])
1✔
2239
            usage_plan_ids.extend(
1✔
2240
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2241
            )
2242

2243
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2244
        with contextlib.suppress(Exception):
1✔
2245
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2246

2247
        # finally delete the usage plans and the API Keys linked to it
2248
        for usage_plan_id in usage_plan_ids:
1✔
2249
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2250
            for key in usage_plan_keys.get("items", []):
1✔
2251
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2252
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2253

2254

2255
@pytest.fixture
1✔
2256
def create_rest_apigw_openapi(aws_client_factory):
1✔
2257
    rest_apis = []
×
2258

2259
    def _create_apigateway_function(**kwargs):
×
2260
        region_name = kwargs.pop("region_name", None)
×
2261
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2262

2263
        response = apigateway_client.import_rest_api(**kwargs)
×
2264
        api_id = response.get("id")
×
2265
        rest_apis.append((api_id, region_name))
×
2266
        return api_id, response
×
2267

2268
    yield _create_apigateway_function
×
2269

2270
    for rest_api_id, region_name in rest_apis:
×
2271
        with contextlib.suppress(Exception):
×
2272
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2273
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2274

2275

2276
@pytest.fixture
1✔
2277
def assert_host_customisation(monkeypatch):
1✔
2278
    localstack_host = "foo.bar"
1✔
2279
    monkeypatch.setattr(
1✔
2280
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2281
    )
2282

2283
    def asserter(
1✔
2284
        url: str,
2285
        *,
2286
        custom_host: str | None = None,
2287
    ):
2288
        if custom_host is not None:
1✔
2289
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2290

2291
            assert localstack_host not in url
×
2292
        else:
2293
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2294

2295
    yield asserter
1✔
2296

2297

2298
@pytest.fixture
1✔
2299
def echo_http_server(httpserver: HTTPServer):
1✔
2300
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2301

2302
    def _echo(request: Request) -> Response:
1✔
2303
        request_json = None
1✔
2304
        if request.is_json:
1✔
2305
            with contextlib.suppress(ValueError):
1✔
2306
                request_json = json.loads(request.data)
1✔
2307
        result = {
1✔
2308
            "data": request.data or "{}",
2309
            "headers": dict(request.headers),
2310
            "url": request.url,
2311
            "method": request.method,
2312
            "json": request_json,
2313
        }
2314
        response_body = json.dumps(json_safe(result))
1✔
2315
        return Response(response_body, status=200)
1✔
2316

2317
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2318
    http_endpoint = httpserver.url_for("/")
1✔
2319

2320
    return http_endpoint
1✔
2321

2322

2323
@pytest.fixture
1✔
2324
def echo_http_server_post(echo_http_server):
1✔
2325
    """
2326
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2327
    """
2328
    if is_aws_cloud():
1✔
2329
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2330

2331
    return f"{echo_http_server}post"
1✔
2332

2333

2334
def create_policy_doc(effect: str, actions: list, resource=None) -> dict:
1✔
2335
    actions = ensure_list(actions)
1✔
2336
    resource = resource or "*"
1✔
2337
    return {
1✔
2338
        "Version": "2012-10-17",
2339
        "Statement": [
2340
            {
2341
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2342
                "Sid": f"s{short_uid()}",
2343
                "Effect": effect,
2344
                "Action": actions,
2345
                "Resource": resource,
2346
            }
2347
        ],
2348
    }
2349

2350

2351
@pytest.fixture
1✔
2352
def create_policy_generated_document(create_policy):
1✔
2353
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2354
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2355
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2356
        response = create_policy(
1✔
2357
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2358
        )
2359
        policy_arn = response["Policy"]["Arn"]
1✔
2360
        return policy_arn
1✔
2361

2362
    return _create_policy_with_doc
1✔
2363

2364

2365
@pytest.fixture
1✔
2366
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2367
    def _create_role_with_policy(
1✔
2368
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2369
    ):
2370
        iam_client = iam_client or aws_client.iam
1✔
2371

2372
        role_name = f"role-{short_uid()}"
1✔
2373
        result = create_role(
1✔
2374
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2375
        )
2376
        role_arn = result["Role"]["Arn"]
1✔
2377
        policy_name = f"p-{short_uid()}"
1✔
2378

2379
        if attach:
1✔
2380
            # create role and attach role policy
2381
            policy_arn = create_policy_generated_document(
1✔
2382
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2383
            )
2384
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2385
        else:
2386
            # put role policy
2387
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2388
            policy_document = json.dumps(policy_document)
1✔
2389
            iam_client.put_role_policy(
1✔
2390
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2391
            )
2392

2393
        return role_name, role_arn
1✔
2394

2395
    return _create_role_with_policy
1✔
2396

2397

2398
@pytest.fixture
1✔
2399
def create_user_with_policy(create_policy_generated_document, create_user, aws_client, region_name):
1✔
NEW
2400
    def _create_user_with_policy(effect, actions, resource=None, user_name=None):
×
2401
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
NEW
2402
        username = user_name or f"user-{short_uid()}"
×
2403
        create_user(UserName=username)
×
2404
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2405
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2406

NEW
2407
        wait_for_user(keys=keys, region_name=region_name)
×
2408
        return username, keys
×
2409

2410
    return _create_user_with_policy
×
2411

2412

2413
@pytest.fixture()
1✔
2414
def register_extension(s3_bucket, aws_client):
1✔
UNCOV
2415
    cfn_client = aws_client.cloudformation
×
UNCOV
2416
    extensions_arns = []
×
2417

2418
    def _register(extension_name, extension_type, artifact_path):
×
UNCOV
2419
        bucket = s3_bucket
×
2420
        key = f"artifact-{short_uid()}"
×
2421

2422
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2423

2424
        register_response = cfn_client.register_type(
×
2425
            Type=extension_type,
2426
            TypeName=extension_name,
2427
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2428
        )
2429

UNCOV
2430
        registration_token = register_response["RegistrationToken"]
×
UNCOV
2431
        cfn_client.get_waiter("type_registration_complete").wait(
×
2432
            RegistrationToken=registration_token
2433
        )
2434

UNCOV
2435
        describe_response = cfn_client.describe_type_registration(
×
2436
            RegistrationToken=registration_token
2437
        )
2438

UNCOV
2439
        extensions_arns.append(describe_response["TypeArn"])
×
UNCOV
2440
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2441

2442
        return describe_response
×
2443

2444
    yield _register
×
2445

2446
    for arn in extensions_arns:
×
UNCOV
2447
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2448
        for v in versions:
×
2449
            try:
×
2450
                cfn_client.deregister_type(Arn=v["Arn"])
×
2451
            except Exception:
×
2452
                continue
×
2453
        cfn_client.deregister_type(Arn=arn)
×
2454

2455

2456
@pytest.fixture
1✔
2457
def hosted_zone(aws_client):
1✔
2458
    zone_ids = []
1✔
2459

2460
    def factory(**kwargs):
1✔
2461
        if "CallerReference" not in kwargs:
1✔
2462
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2463
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2464
        zone_id = response["HostedZone"]["Id"]
1✔
2465
        zone_ids.append(zone_id)
1✔
2466
        return response
1✔
2467

2468
    yield factory
1✔
2469

2470
    for zone_id in zone_ids[::-1]:
1✔
2471
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2472

2473

2474
@pytest.fixture
1✔
2475
def openapi_validate(monkeypatch):
1✔
2476
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2477
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2478

2479

2480
@pytest.fixture
1✔
2481
def set_resource_custom_id():
1✔
2482
    set_ids = []
1✔
2483

2484
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2485
        localstack_id_manager.set_custom_id(
1✔
2486
            resource_identifier=resource_identifier, custom_id=custom_id
2487
        )
2488
        set_ids.append(resource_identifier)
1✔
2489

2490
    yield _set_custom_id
1✔
2491

2492
    for resource_identifier in set_ids:
1✔
2493
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2494

2495

2496
###############################
2497
# Events (EventBridge) fixtures
2498
###############################
2499

2500

2501
@pytest.fixture
1✔
2502
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2503
    event_bus_names = []
1✔
2504

2505
    def _create_event_bus(**kwargs):
1✔
2506
        if "Name" not in kwargs:
1✔
UNCOV
2507
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2508

2509
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2510
        event_bus_names.append(kwargs["Name"])
1✔
2511
        return response
1✔
2512

2513
    yield _create_event_bus
1✔
2514

2515
    for event_bus_name in event_bus_names:
1✔
2516
        try:
1✔
2517
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2518
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2519

2520
            # Delete all rules for the current event bus
2521
            for rule in rules:
1✔
2522
                try:
1✔
2523
                    response = aws_client.events.list_targets_by_rule(
1✔
2524
                        Rule=rule, EventBusName=event_bus_name
2525
                    )
2526
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2527

2528
                    # Remove all targets for the current rule
2529
                    if targets:
1✔
2530
                        for target in targets:
1✔
2531
                            aws_client.events.remove_targets(
1✔
2532
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2533
                            )
2534

2535
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
UNCOV
2536
                except Exception as e:
×
UNCOV
2537
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2538

2539
            # Delete archives for event bus
2540
            event_source_arn = (
1✔
2541
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2542
            )
2543
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2544
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2545
            for archive in archives:
1✔
2546
                try:
1✔
2547
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
UNCOV
2548
                except Exception as e:
×
UNCOV
2549
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2550

2551
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2552
        except Exception as e:
1✔
2553
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2554

2555

2556
@pytest.fixture
1✔
2557
def events_put_rule(aws_client):
1✔
2558
    rules = []
1✔
2559

2560
    def _put_rule(**kwargs):
1✔
2561
        if "Name" not in kwargs:
1✔
UNCOV
2562
            kwargs["Name"] = f"rule-{short_uid()}"
×
2563

2564
        response = aws_client.events.put_rule(**kwargs)
1✔
2565
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2566
        return response
1✔
2567

2568
    yield _put_rule
1✔
2569

2570
    for rule, event_bus_name in rules:
1✔
2571
        try:
1✔
2572
            response = aws_client.events.list_targets_by_rule(
1✔
2573
                Rule=rule, EventBusName=event_bus_name
2574
            )
2575
            targets = [target["Id"] for target in response["Targets"]]
1✔
2576

2577
            # Remove all targets for the current rule
2578
            if targets:
1✔
2579
                for target in targets:
1✔
2580
                    aws_client.events.remove_targets(
1✔
2581
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2582
                    )
2583

2584
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2585
        except Exception as e:
1✔
2586
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2587

2588

2589
@pytest.fixture
1✔
2590
def events_create_rule(aws_client):
1✔
2591
    rules = []
1✔
2592

2593
    def _create_rule(**kwargs):
1✔
2594
        rule_name = kwargs["Name"]
1✔
2595
        bus_name = kwargs.get("EventBusName", "")
1✔
2596
        pattern = kwargs.get("EventPattern", {})
1✔
2597
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2598
        rule_arn = aws_client.events.put_rule(
1✔
2599
            Name=rule_name,
2600
            EventBusName=bus_name,
2601
            EventPattern=json.dumps(pattern),
2602
            ScheduleExpression=schedule,
2603
        )["RuleArn"]
2604
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2605
        return rule_arn
1✔
2606

2607
    yield _create_rule
1✔
2608

2609
    for rule in rules:
1✔
2610
        targets = aws_client.events.list_targets_by_rule(
1✔
2611
            Rule=rule["name"], EventBusName=rule["bus"]
2612
        )["Targets"]
2613

2614
        targetIds = [target["Id"] for target in targets]
1✔
2615
        if len(targetIds) > 0:
1✔
2616
            aws_client.events.remove_targets(
1✔
2617
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2618
            )
2619

2620
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2621

2622

2623
@pytest.fixture
1✔
2624
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2625
    """
2626
    Fixture that creates an SQS queue and sets it up as a target for EventBridge events.
2627
    """
2628
    queue_urls = []
1✔
2629

2630
    def _sqs_as_events_target(
1✔
2631
        queue_name: str | None = None, custom_aws_client=None
2632
    ) -> tuple[str, str]:
2633
        if not queue_name:
1✔
2634
            queue_name = f"tests-queue-{short_uid()}"
1✔
2635
        if custom_aws_client:
1✔
UNCOV
2636
            sqs_client = custom_aws_client.sqs
×
2637
        else:
2638
            sqs_client = aws_client.sqs
1✔
2639
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2640
        queue_urls.append((queue_url, sqs_client))
1✔
2641
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2642
        policy = {
1✔
2643
            "Version": "2012-10-17",
2644
            "Id": f"sqs-eventbridge-{short_uid()}",
2645
            "Statement": [
2646
                {
2647
                    "Sid": f"SendMessage-{short_uid()}",
2648
                    "Effect": "Allow",
2649
                    "Principal": {"Service": "events.amazonaws.com"},
2650
                    "Action": "sqs:SendMessage",
2651
                    "Resource": queue_arn,
2652
                }
2653
            ],
2654
        }
2655
        sqs_client.set_queue_attributes(
1✔
2656
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2657
        )
2658
        return queue_url, queue_arn, queue_name
1✔
2659

2660
    yield _sqs_as_events_target
1✔
2661

2662
    for queue_url, sqs_client in queue_urls:
1✔
2663
        try:
1✔
2664
            sqs_client.delete_queue(QueueUrl=queue_url)
1✔
UNCOV
2665
        except Exception as e:
×
UNCOV
2666
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2667

2668

2669
@pytest.fixture
1✔
2670
def create_role_event_bus_source_to_bus_target(create_iam_role_with_policy):
1✔
2671
    def _create_role_event_bus_to_bus():
1✔
2672
        assume_role_policy_document_bus_source_to_bus_target = {
1✔
2673
            "Version": "2012-10-17",
2674
            "Statement": [
2675
                {
2676
                    "Effect": "Allow",
2677
                    "Principal": {"Service": "events.amazonaws.com"},
2678
                    "Action": "sts:AssumeRole",
2679
                }
2680
            ],
2681
        }
2682

2683
        policy_document_bus_source_to_bus_target = {
1✔
2684
            "Version": "2012-10-17",
2685
            "Statement": [
2686
                {
2687
                    "Sid": "",
2688
                    "Effect": "Allow",
2689
                    "Action": "events:PutEvents",
2690
                    "Resource": "arn:aws:events:*:*:event-bus/*",
2691
                }
2692
            ],
2693
        }
2694

2695
        role_arn_bus_source_to_bus_target = create_iam_role_with_policy(
1✔
2696
            RoleDefinition=assume_role_policy_document_bus_source_to_bus_target,
2697
            PolicyDefinition=policy_document_bus_source_to_bus_target,
2698
        )
2699

2700
        return role_arn_bus_source_to_bus_target
1✔
2701

2702
    yield _create_role_event_bus_to_bus
1✔
2703

2704

2705
@pytest.fixture
1✔
2706
def get_primary_secondary_client(
1✔
2707
    aws_client_factory,
2708
    secondary_aws_client_factory,
2709
    region_name,
2710
    secondary_region_name,
2711
    account_id,
2712
    secondary_account_id,
2713
):
2714
    def _get_primary_secondary_clients(cross_scenario: str):
1✔
2715
        """
2716
        Returns primary and secondary AWS clients based on the cross-scenario.
2717
        :param cross_scenario: The scenario for cross-region or cross-account testing.
2718
                               Options: "region", "account", "region_account"
2719
                               account_region cross scenario is not supported by AWS
2720
        :return: A dictionary containing primary and secondary AWS clients, and their respective region and account IDs.
2721
        """
2722
        secondary_region = secondary_region_name
1✔
2723
        secondary_account = secondary_account_id
1✔
2724
        if cross_scenario not in ["region", "account", "region_account"]:
1✔
UNCOV
2725
            raise ValueError(f"cross_scenario {cross_scenario} not supported")
×
2726

2727
        primary_client = aws_client_factory(region_name=region_name)
1✔
2728

2729
        if cross_scenario == "region":
1✔
2730
            secondary_account = account_id
1✔
2731
            secondary_client = aws_client_factory(region_name=secondary_region_name)
1✔
2732

2733
        elif cross_scenario == "account":
1✔
2734
            secondary_region = region_name
1✔
2735
            secondary_client = secondary_aws_client_factory(region_name=region_name)
1✔
2736

2737
        elif cross_scenario == "region_account":
1✔
2738
            secondary_client = secondary_aws_client_factory(region_name=secondary_region)
1✔
2739

2740
        return {
1✔
2741
            "primary_aws_client": primary_client,
2742
            "secondary_aws_client": secondary_client,
2743
            "secondary_region_name": secondary_region,
2744
            "secondary_account_id": secondary_account,
2745
        }
2746

2747
    return _get_primary_secondary_clients
1✔
2748

2749

2750
@pytest.fixture
1✔
2751
def clean_up(
1✔
2752
    aws_client,
2753
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2754
    def _clean_up(
1✔
2755
        bus_name=None,
2756
        rule_name=None,
2757
        target_ids=None,
2758
        queue_url=None,
2759
        log_group_name=None,
2760
    ):
2761
        events_client = aws_client.events
1✔
2762
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2763
        if target_ids:
1✔
2764
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2765
            call_safe(
1✔
2766
                events_client.remove_targets,
2767
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2768
            )
2769
        if rule_name:
1✔
2770
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2771
        if bus_name:
1✔
UNCOV
2772
            call_safe(events_client.delete_event_bus, kwargs={"Name": bus_name})
×
2773
        if queue_url:
1✔
2774
            sqs_client = aws_client.sqs
×
UNCOV
2775
            call_safe(sqs_client.delete_queue, kwargs={"QueueUrl": queue_url})
×
2776
        if log_group_name:
1✔
2777
            logs_client = aws_client.logs
×
2778

2779
            def _delete_log_group():
×
UNCOV
2780
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2781
                for log_stream in log_streams["logStreams"]:
×
2782
                    logs_client.delete_log_stream(
×
2783
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2784
                    )
UNCOV
2785
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2786

2787
            call_safe(_delete_log_group)
×
2788

2789
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc