• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 044be5da-a5da-4293-9052-3ff1d51b5910

21 Feb 2025 08:33PM UTC coverage: 86.88% (-0.02%) from 86.896%
044be5da-a5da-4293-9052-3ff1d51b5910

push

circleci

web-flow
fix SNS FIFO ordering (#12285)

Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>

70 of 79 new or added lines in 2 files covered. (88.61%)

64 existing lines in 10 files now uncovered.

61659 of 70970 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.39
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
1✔
10

11
import botocore.auth
1✔
12
import botocore.config
1✔
13
import botocore.credentials
1✔
14
import botocore.session
1✔
15
import pytest
1✔
16
from _pytest.config import Config
1✔
17
from _pytest.nodes import Item
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.regions import EndpointResolver
1✔
20
from pytest_httpserver import HTTPServer
1✔
21
from werkzeug import Request, Response
1✔
22

23
from localstack import config
1✔
24
from localstack.aws.connect import ServiceLevelClientFactory
1✔
25
from localstack.services.stores import (
1✔
26
    AccountRegionBundle,
27
    BaseStore,
28
    CrossAccountAttribute,
29
    CrossRegionAttribute,
30
    LocalAttribute,
31
)
32
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
33
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
34
from localstack.testing.config import (
1✔
35
    SECONDARY_TEST_AWS_ACCOUNT_ID,
36
    SECONDARY_TEST_AWS_REGION_NAME,
37
    TEST_AWS_ACCOUNT_ID,
38
    TEST_AWS_REGION_NAME,
39
)
40
from localstack.utils import testutil
1✔
41
from localstack.utils.aws.arns import get_partition
1✔
42
from localstack.utils.aws.client import SigningHttpClient
1✔
43
from localstack.utils.aws.resources import create_dynamodb_table
1✔
44
from localstack.utils.bootstrap import is_api_enabled
1✔
45
from localstack.utils.collections import ensure_list
1✔
46
from localstack.utils.functions import call_safe, run_safe
1✔
47
from localstack.utils.http import safe_requests as requests
1✔
48
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
49
from localstack.utils.json import CustomEncoder, json_safe
1✔
50
from localstack.utils.net import wait_for_port_open
1✔
51
from localstack.utils.strings import short_uid, to_str
1✔
52
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
53

54
LOG = logging.getLogger(__name__)
1✔
55

56
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
57
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
58

59
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
60
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
61
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
62
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
63

64

65
if TYPE_CHECKING:
1✔
66
    from mypy_boto3_sqs import SQSClient
×
67
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
68

69

70
@pytest.fixture(scope="class")
1✔
71
def aws_http_client_factory(aws_session):
1✔
72
    """
73
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
74
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
75
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
76
    LocalStack.
77

78
    Example invocations
79

80
        client = aws_signing_http_client_factory("sqs")
81
        client.get("http://localhost:4566/000000000000/my-queue")
82

83
    or
84
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
85
        client.post("...")
86
    """
87

88
    def factory(
1✔
89
        service: str,
90
        region: str = None,
91
        signer_factory: Callable[
92
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
93
        ] = botocore.auth.SigV4QueryAuth,
94
        endpoint_url: str = None,
95
        aws_access_key_id: str = None,
96
        aws_secret_access_key: str = None,
97
    ):
98
        region = region or TEST_AWS_REGION_NAME
1✔
99

100
        if aws_access_key_id or aws_secret_access_key:
1✔
101
            credentials = botocore.credentials.Credentials(
1✔
102
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
103
            )
104
        else:
105
            credentials = aws_session.get_credentials()
1✔
106

107
        creds = credentials.get_frozen_credentials()
1✔
108

109
        if not endpoint_url:
1✔
110
            if is_aws_cloud():
1✔
111
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
112
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
113
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
114
            else:
115
                endpoint_url = config.internal_service_url()
1✔
116

117
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
118

119
    return factory
1✔
120

121

122
@pytest.fixture(scope="class")
1✔
123
def s3_vhost_client(aws_client_factory, region_name):
1✔
124
    return aws_client_factory(
1✔
125
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
126
    ).s3
127

128

129
@pytest.fixture
1✔
130
def dynamodb_wait_for_table_active(aws_client):
1✔
131
    def wait_for_table_active(table_name: str, client=None):
1✔
132
        def wait():
1✔
133
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
134
                "TableStatus"
135
            ] == "ACTIVE"
136

137
        poll_condition(wait, timeout=30)
1✔
138

139
    return wait_for_table_active
1✔
140

141

142
@pytest.fixture
1✔
143
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
144
    tables = []
1✔
145

146
    def factory(**kwargs):
1✔
147
        if "TableName" not in kwargs:
1✔
148
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
149

150
        tables.append(kwargs["TableName"])
1✔
151
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
152
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
153
        return response
1✔
154

155
    yield factory
1✔
156

157
    # cleanup
158
    for table in tables:
1✔
159
        try:
1✔
160
            # table has to be in ACTIVE state before deletion
161
            dynamodb_wait_for_table_active(table)
1✔
162
            aws_client.dynamodb.delete_table(TableName=table)
1✔
163
        except Exception as e:
1✔
164
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
165

166

167
@pytest.fixture
1✔
168
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
169
    # beware, this swallows exception in create_dynamodb_table utility function
170
    tables = []
1✔
171

172
    def factory(**kwargs):
1✔
173
        kwargs["client"] = aws_client.dynamodb
1✔
174
        if "table_name" not in kwargs:
1✔
175
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
176
        if "partition_key" not in kwargs:
1✔
177
            kwargs["partition_key"] = "id"
1✔
178

179
        tables.append(kwargs["table_name"])
1✔
180

181
        return create_dynamodb_table(**kwargs)
1✔
182

183
    yield factory
1✔
184

185
    # cleanup
186
    for table in tables:
1✔
187
        try:
1✔
188
            # table has to be in ACTIVE state before deletion
189
            dynamodb_wait_for_table_active(table)
1✔
190
            aws_client.dynamodb.delete_table(TableName=table)
1✔
191
        except Exception as e:
1✔
192
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
193

194

195
@pytest.fixture
1✔
196
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
197
    buckets = []
1✔
198

199
    def factory(**kwargs) -> str:
1✔
200
        if "Bucket" not in kwargs:
1✔
201
            kwargs["Bucket"] = "test-bucket-%s" % short_uid()
1✔
202

203
        if (
1✔
204
            "CreateBucketConfiguration" not in kwargs
205
            and aws_client.s3.meta.region_name != "us-east-1"
206
        ):
UNCOV
207
            kwargs["CreateBucketConfiguration"] = {
×
208
                "LocationConstraint": aws_client.s3.meta.region_name
209
            }
210

211
        aws_client.s3.create_bucket(**kwargs)
1✔
212
        buckets.append(kwargs["Bucket"])
1✔
213
        return kwargs["Bucket"]
1✔
214

215
    yield factory
1✔
216

217
    # cleanup
218
    for bucket in buckets:
1✔
219
        try:
1✔
220
            s3_empty_bucket(bucket)
1✔
221
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
222
        except Exception as e:
1✔
223
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
224

225

226
@pytest.fixture
1✔
227
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
228
    buckets = []
1✔
229

230
    def factory(s3_client, **kwargs) -> str:
1✔
231
        if "Bucket" not in kwargs:
1✔
232
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
233

234
        response = s3_client.create_bucket(**kwargs)
1✔
235
        buckets.append(kwargs["Bucket"])
1✔
236
        return response
1✔
237

238
    yield factory
1✔
239

240
    # cleanup
241
    for bucket in buckets:
1✔
242
        try:
1✔
243
            s3_empty_bucket(bucket)
1✔
244
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
245
        except Exception as e:
×
246
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
247

248

249
@pytest.fixture
1✔
250
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
251
    region = aws_client.s3.meta.region_name
1✔
252
    kwargs = {}
1✔
253
    if region != "us-east-1":
1✔
UNCOV
254
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
255
    return s3_create_bucket(**kwargs)
1✔
256

257

258
@pytest.fixture
1✔
259
def s3_empty_bucket(aws_client):
1✔
260
    """
261
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
262
    """
263

264
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
265
    # FIXME: this won't work when bucket has more than 1000 objects
266
    def factory(bucket_name: str):
1✔
267
        kwargs = {}
1✔
268
        try:
1✔
269
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
270
            kwargs["BypassGovernanceRetention"] = True
1✔
271
        except ClientError:
1✔
272
            pass
1✔
273

274
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
275
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
276
        if objects:
1✔
277
            aws_client.s3.delete_objects(
1✔
278
                Bucket=bucket_name,
279
                Delete={"Objects": objects},
280
                **kwargs,
281
            )
282

283
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
284
        versions = response.get("Versions", [])
1✔
285
        versions.extend(response.get("DeleteMarkers", []))
1✔
286

287
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
288
        if object_versions:
1✔
289
            aws_client.s3.delete_objects(
1✔
290
                Bucket=bucket_name,
291
                Delete={"Objects": object_versions},
292
                **kwargs,
293
            )
294

295
    yield factory
1✔
296

297

298
@pytest.fixture
1✔
299
def sqs_create_queue(aws_client):
1✔
300
    queue_urls = []
1✔
301

302
    def factory(**kwargs):
1✔
303
        if "QueueName" not in kwargs:
1✔
304
            kwargs["QueueName"] = "test-queue-%s" % short_uid()
1✔
305

306
        response = aws_client.sqs.create_queue(**kwargs)
1✔
307
        url = response["QueueUrl"]
1✔
308
        queue_urls.append(url)
1✔
309

310
        return url
1✔
311

312
    yield factory
1✔
313

314
    # cleanup
315
    for queue_url in queue_urls:
1✔
316
        try:
1✔
317
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
318
        except Exception as e:
1✔
319
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
320

321

322
@pytest.fixture
1✔
323
def sqs_receive_messages_delete(aws_client):
1✔
324
    def factory(
1✔
325
        queue_url: str,
326
        expected_messages: Optional[int] = None,
327
        wait_time: Optional[int] = 5,
328
    ):
329
        response = aws_client.sqs.receive_message(
1✔
330
            QueueUrl=queue_url,
331
            MessageAttributeNames=["All"],
332
            VisibilityTimeout=0,
333
            WaitTimeSeconds=wait_time,
334
        )
335
        messages = []
1✔
336
        for m in response["Messages"]:
1✔
337
            message = json.loads(to_str(m["Body"]))
1✔
338
            messages.append(message)
1✔
339

340
        if expected_messages is not None:
1✔
341
            assert len(messages) == expected_messages
×
342

343
        for message in response["Messages"]:
1✔
344
            aws_client.sqs.delete_message(
1✔
345
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
346
            )
347

348
        return messages
1✔
349

350
    return factory
1✔
351

352

353
@pytest.fixture
1✔
354
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
355
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
356
        all_messages = []
1✔
357
        for _ in range(max_iterations):
1✔
358
            try:
1✔
359
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
360
            except KeyError:
×
361
                # there were no messages
362
                continue
×
363
            all_messages.extend(messages)
1✔
364

365
            if len(all_messages) >= expected_messages:
1✔
366
                return all_messages[:expected_messages]
1✔
367

368
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
369

370
    return factory
1✔
371

372

373
@pytest.fixture
1✔
374
def sqs_collect_messages(aws_client):
1✔
375
    """Collects SQS messages from a given queue_url and deletes them by default.
376
    Example usage:
377
    messages = sqs_collect_messages(
378
         my_queue_url,
379
         expected=2,
380
         timeout=10,
381
         attribute_names=["All"],
382
         message_attribute_names=["All"],
383
    )
384
    """
385

386
    def factory(
1✔
387
        queue_url: str,
388
        expected: int,
389
        timeout: int,
390
        delete: bool = True,
391
        attribute_names: list[str] = None,
392
        message_attribute_names: list[str] = None,
393
        max_number_of_messages: int = 1,
394
        wait_time_seconds: int = 5,
395
        sqs_client: "SQSClient | None" = None,
396
    ) -> list["MessageTypeDef"]:
397
        sqs_client = sqs_client or aws_client.sqs
1✔
398
        collected = []
1✔
399

400
        def _receive():
1✔
401
            response = sqs_client.receive_message(
1✔
402
                QueueUrl=queue_url,
403
                # Maximum is 20 seconds. Performs long polling.
404
                WaitTimeSeconds=wait_time_seconds,
405
                # Maximum 10 messages
406
                MaxNumberOfMessages=max_number_of_messages,
407
                AttributeNames=attribute_names or [],
408
                MessageAttributeNames=message_attribute_names or [],
409
            )
410

411
            if messages := response.get("Messages"):
1✔
412
                collected.extend(messages)
1✔
413

414
                if delete:
1✔
415
                    for m in messages:
1✔
416
                        sqs_client.delete_message(
1✔
417
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
418
                        )
419

420
            return len(collected) >= expected
1✔
421

422
        if not poll_condition(_receive, timeout=timeout):
1✔
423
            raise TimeoutError(
×
424
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
425
            )
426

427
        return collected
1✔
428

429
    yield factory
1✔
430

431

432
@pytest.fixture
1✔
433
def sqs_queue(sqs_create_queue):
1✔
434
    return sqs_create_queue()
1✔
435

436

437
@pytest.fixture
1✔
438
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
439
    def _get_queue_arn(queue_url: str) -> str:
1✔
440
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
441
            "Attributes"
442
        ]["QueueArn"]
443

444
    return _get_queue_arn
1✔
445

446

447
@pytest.fixture
1✔
448
def sqs_queue_exists(aws_client):
1✔
449
    def _queue_exists(queue_url: str) -> bool:
1✔
450
        """
451
        Checks whether a queue with the given queue URL exists.
452
        :param queue_url: the queue URL
453
        :return: true if the queue exists, false otherwise
454
        """
455
        try:
1✔
456
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
457
            return result.get("QueueUrl") == queue_url
×
458
        except ClientError as e:
1✔
459
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
460
                return False
1✔
461
            raise
×
462

463
    yield _queue_exists
1✔
464

465

466
@pytest.fixture
1✔
467
def sns_create_topic(aws_client):
1✔
468
    topic_arns = []
1✔
469

470
    def _create_topic(**kwargs):
1✔
471
        if "Name" not in kwargs:
1✔
472
            kwargs["Name"] = "test-topic-%s" % short_uid()
1✔
473
        response = aws_client.sns.create_topic(**kwargs)
1✔
474
        topic_arns.append(response["TopicArn"])
1✔
475
        return response
1✔
476

477
    yield _create_topic
1✔
478

479
    for topic_arn in topic_arns:
1✔
480
        try:
1✔
481
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
482
        except Exception as e:
×
483
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
484

485

486
@pytest.fixture
1✔
487
def sns_wait_for_topic_delete(aws_client):
1✔
488
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
489
        def wait():
1✔
490
            try:
1✔
491
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
492
                return False
×
493
            except Exception as e:
1✔
494
                if "NotFound" in e.response["Error"]["Code"]:
1✔
495
                    return True
1✔
496

497
                raise
×
498

499
        poll_condition(wait, timeout=30)
1✔
500

501
    return wait_for_topic_delete
1✔
502

503

504
@pytest.fixture
1✔
505
def sns_subscription(aws_client):
1✔
506
    sub_arns = []
1✔
507

508
    def _create_sub(**kwargs):
1✔
509
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
510
            kwargs["ReturnSubscriptionArn"] = True
1✔
511

512
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
513
        response = aws_client.sns.subscribe(**kwargs)
1✔
514
        sub_arn = response["SubscriptionArn"]
1✔
515
        sub_arns.append(sub_arn)
1✔
516
        return response
1✔
517

518
    yield _create_sub
1✔
519

520
    for sub_arn in sub_arns:
1✔
521
        try:
1✔
522
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
523
        except Exception as e:
1✔
524
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
525

526

527
@pytest.fixture
1✔
528
def sns_topic(sns_create_topic, aws_client):
1✔
529
    topic_arn = sns_create_topic()["TopicArn"]
1✔
530
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
531

532

533
@pytest.fixture
1✔
534
def sns_allow_topic_sqs_queue(aws_client):
1✔
535
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
536
        # allow topic to write to sqs queue
537
        aws_client.sqs.set_queue_attributes(
1✔
538
            QueueUrl=sqs_queue_url,
539
            Attributes={
540
                "Policy": json.dumps(
541
                    {
542
                        "Statement": [
543
                            {
544
                                "Effect": "Allow",
545
                                "Principal": {"Service": "sns.amazonaws.com"},
546
                                "Action": "sqs:SendMessage",
547
                                "Resource": sqs_queue_arn,
548
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
549
                            }
550
                        ]
551
                    }
552
                )
553
            },
554
        )
555

556
    return _allow_sns_topic
1✔
557

558

559
@pytest.fixture
1✔
560
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
561
    subscriptions = []
1✔
562

563
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> Dict[str, str]:
1✔
564
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
565

566
        # connect sns topic to sqs
567
        subscription = aws_client.sns.subscribe(
1✔
568
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
569
        )
570
        subscription_arn = subscription["SubscriptionArn"]
1✔
571

572
        # allow topic to write to sqs queue
573
        sns_allow_topic_sqs_queue(
1✔
574
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
575
        )
576

577
        subscriptions.append(subscription_arn)
1✔
578
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
579
            "Attributes"
580
        ]
581

582
    yield _factory
1✔
583

584
    for arn in subscriptions:
1✔
585
        try:
1✔
586
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
587
        except Exception as e:
×
588
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
589

590

591
@pytest.fixture
1✔
592
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
593
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
594
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
595
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
596
    http_servers = []
1✔
597

598
    def _create_http_endpoint(
1✔
599
        raw_message_delivery: bool = False,
600
    ) -> Tuple[str, str, str, HTTPServer]:
601
        server = HTTPServer()
1✔
602
        server.start()
1✔
603
        http_servers.append(server)
1✔
604
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
605
        endpoint_url = server.url_for("/sns-endpoint")
1✔
606
        wait_for_port_open(endpoint_url)
1✔
607

608
        topic_arn = sns_create_topic()["TopicArn"]
1✔
609
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
610
        subscription_arn = subscription["SubscriptionArn"]
1✔
611
        delivery_policy = {
1✔
612
            "healthyRetryPolicy": {
613
                "minDelayTarget": 1,
614
                "maxDelayTarget": 1,
615
                "numRetries": 0,
616
                "numNoDelayRetries": 0,
617
                "numMinDelayRetries": 0,
618
                "numMaxDelayRetries": 0,
619
                "backoffFunction": "linear",
620
            },
621
            "sicklyRetryPolicy": None,
622
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
623
            "guaranteed": False,
624
        }
625
        aws_client.sns.set_subscription_attributes(
1✔
626
            SubscriptionArn=subscription_arn,
627
            AttributeName="DeliveryPolicy",
628
            AttributeValue=json.dumps(delivery_policy),
629
        )
630

631
        if raw_message_delivery:
1✔
632
            aws_client.sns.set_subscription_attributes(
1✔
633
                SubscriptionArn=subscription_arn,
634
                AttributeName="RawMessageDelivery",
635
                AttributeValue="true",
636
            )
637

638
        return topic_arn, subscription_arn, endpoint_url, server
1✔
639

640
    yield _create_http_endpoint
1✔
641

642
    for http_server in http_servers:
1✔
643
        if http_server.is_running():
1✔
644
            http_server.stop()
1✔
645

646

647
@pytest.fixture
1✔
648
def route53_hosted_zone(aws_client):
1✔
649
    hosted_zones = []
1✔
650

651
    def factory(**kwargs):
1✔
652
        if "Name" not in kwargs:
1✔
653
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
654
        if "CallerReference" not in kwargs:
1✔
655
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
656
        response = aws_client.route53.create_hosted_zone(
1✔
657
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
658
        )
659
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
660
        return response
1✔
661

662
    yield factory
1✔
663

664
    for zone in hosted_zones:
1✔
665
        try:
1✔
666
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
667
        except Exception as e:
1✔
668
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
669

670

671
@pytest.fixture
1✔
672
def transcribe_create_job(s3_bucket, aws_client):
1✔
673
    job_names = []
1✔
674

675
    def _create_job(audio_file: str, params: Optional[dict[str, Any]] = None) -> str:
1✔
676
        s3_key = "test-clip.wav"
1✔
677

678
        if not params:
1✔
679
            params = {}
1✔
680

681
        if "TranscriptionJobName" not in params:
1✔
682
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
683

684
        if "LanguageCode" not in params:
1✔
685
            params["LanguageCode"] = "en-GB"
1✔
686

687
        if "Media" not in params:
1✔
688
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
689

690
        # upload test wav to a s3 bucket
691
        with open(audio_file, "rb") as f:
1✔
692
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
693

694
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
695

696
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
697
        job_names.append(job_name)
1✔
698

699
        return job_name
1✔
700

701
    yield _create_job
1✔
702

703
    for job_name in job_names:
1✔
704
        with contextlib.suppress(ClientError):
1✔
705
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
706

707

708
@pytest.fixture
1✔
709
def kinesis_create_stream(aws_client):
1✔
710
    stream_names = []
1✔
711

712
    def _create_stream(**kwargs):
1✔
713
        if "StreamName" not in kwargs:
1✔
714
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
715
        aws_client.kinesis.create_stream(**kwargs)
1✔
716
        stream_names.append(kwargs["StreamName"])
1✔
717
        return kwargs["StreamName"]
1✔
718

719
    yield _create_stream
1✔
720

721
    for stream_name in stream_names:
1✔
722
        try:
1✔
723
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
724
        except Exception as e:
×
725
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
×
726

727

728
@pytest.fixture
1✔
729
def wait_for_stream_ready(aws_client):
1✔
730
    def _wait_for_stream_ready(stream_name: str):
1✔
731
        def is_stream_ready():
1✔
732
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
733
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
734
                "ACTIVE",
735
                "UPDATING",
736
            ]
737

738
        return poll_condition(is_stream_ready)
1✔
739

740
    return _wait_for_stream_ready
1✔
741

742

743
@pytest.fixture
1✔
744
def wait_for_delivery_stream_ready(aws_client):
1✔
745
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
746
        def is_stream_ready():
1✔
747
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
748
                DeliveryStreamName=delivery_stream_name
749
            )
750
            return (
1✔
751
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
752
                == "ACTIVE"
753
            )
754

755
        poll_condition(is_stream_ready)
1✔
756

757
    return _wait_for_stream_ready
1✔
758

759

760
@pytest.fixture
1✔
761
def wait_for_dynamodb_stream_ready(aws_client):
1✔
762
    def _wait_for_stream_ready(stream_arn: str):
1✔
763
        def is_stream_ready():
1✔
764
            describe_stream_response = aws_client.dynamodbstreams.describe_stream(
1✔
765
                StreamArn=stream_arn
766
            )
767
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
768

769
        return poll_condition(is_stream_ready)
1✔
770

771
    return _wait_for_stream_ready
1✔
772

773

774
@pytest.fixture()
1✔
775
def kms_create_key(aws_client_factory):
1✔
776
    key_ids = []
1✔
777

778
    def _create_key(region_name: str = None, **kwargs):
1✔
779
        if "Description" not in kwargs:
1✔
780
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
781
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
782
            "KeyMetadata"
783
        ]
784
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
785
        return key_metadata
1✔
786

787
    yield _create_key
1✔
788

789
    for region_name, key_id in key_ids:
1✔
790
        try:
1✔
791
            # shortest amount of time you can schedule the deletion
792
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
793
                KeyId=key_id, PendingWindowInDays=7
794
            )
795
        except Exception as e:
1✔
796
            exception_message = str(e)
1✔
797
            # Some tests schedule their keys for deletion themselves.
798
            if (
1✔
799
                "KMSInvalidStateException" not in exception_message
800
                or "is pending deletion" not in exception_message
801
            ):
802
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
803

804

805
@pytest.fixture()
1✔
806
def kms_replicate_key(aws_client_factory):
1✔
807
    key_ids = []
1✔
808

809
    def _replicate_key(region_from=None, **kwargs):
1✔
810
        region_to = kwargs.get("ReplicaRegion")
1✔
811
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
812
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
813

814
    yield _replicate_key
1✔
815

816
    for region_to, key_id in key_ids:
1✔
817
        try:
1✔
818
            # shortest amount of time you can schedule the deletion
819
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
820
                KeyId=key_id, PendingWindowInDays=7
821
            )
822
        except Exception as e:
×
823
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
824

825

826
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
827
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
828
# to make sure that we clean up aliases before keys get cleaned up.
829
@pytest.fixture()
1✔
830
def kms_create_alias(kms_create_key, aws_client):
1✔
831
    aliases = []
1✔
832

833
    def _create_alias(**kwargs):
1✔
834
        if "AliasName" not in kwargs:
1✔
835
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
836
        if "TargetKeyId" not in kwargs:
1✔
837
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
838

839
        aws_client.kms.create_alias(**kwargs)
1✔
840
        aliases.append(kwargs["AliasName"])
1✔
841
        return kwargs["AliasName"]
1✔
842

843
    yield _create_alias
1✔
844

845
    for alias in aliases:
1✔
846
        try:
1✔
847
            aws_client.kms.delete_alias(AliasName=alias)
1✔
848
        except Exception as e:
1✔
849
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
850

851

852
@pytest.fixture()
1✔
853
def kms_create_grant(kms_create_key, aws_client):
1✔
854
    grants = []
1✔
855

856
    def _create_grant(**kwargs):
1✔
857
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
858
        # but some GranteePrincipal is required to create a grant.
859
        GRANTEE_PRINCIPAL_ARN = (
1✔
860
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
861
        )
862

863
        if "Operations" not in kwargs:
1✔
864
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
865
        if "GranteePrincipal" not in kwargs:
1✔
866
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
867
        if "KeyId" not in kwargs:
1✔
868
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
869

870
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
871
        grants.append((grant_id, kwargs["KeyId"]))
1✔
872
        return grant_id, kwargs["KeyId"]
1✔
873

874
    yield _create_grant
1✔
875

876
    for grant_id, key_id in grants:
1✔
877
        try:
1✔
878
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
879
        except Exception as e:
×
880
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
881

882

883
@pytest.fixture
1✔
884
def kms_key(kms_create_key):
1✔
885
    return kms_create_key()
1✔
886

887

888
@pytest.fixture
1✔
889
def kms_grant_and_key(kms_key, aws_client):
1✔
890
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
891

892
    return [
1✔
893
        aws_client.kms.create_grant(
894
            KeyId=kms_key["KeyId"],
895
            GranteePrincipal=user_arn,
896
            Operations=["Decrypt", "Encrypt"],
897
        ),
898
        kms_key,
899
    ]
900

901

902
@pytest.fixture
1✔
903
def opensearch_wait_for_cluster(aws_client):
1✔
904
    def _wait_for_cluster(domain_name: str):
1✔
905
        def finished_processing():
1✔
906
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
907
            return status["Processing"] is False and "Endpoint" in status
1✔
908

909
        assert poll_condition(
1✔
910
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
911
        ), f"could not start domain: {domain_name}"
912

913
    return _wait_for_cluster
1✔
914

915

916
@pytest.fixture
1✔
917
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
918
    domains = []
1✔
919

920
    def factory(**kwargs) -> str:
1✔
921
        if "DomainName" not in kwargs:
1✔
922
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
923

924
        aws_client.opensearch.create_domain(**kwargs)
1✔
925

926
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
927

928
        domains.append(kwargs["DomainName"])
1✔
929
        return kwargs["DomainName"]
1✔
930

931
    yield factory
1✔
932

933
    # cleanup
934
    for domain in domains:
1✔
935
        try:
1✔
936
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
937
        except Exception as e:
×
938
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
939

940

941
@pytest.fixture
1✔
942
def opensearch_domain(opensearch_create_domain) -> str:
1✔
943
    return opensearch_create_domain()
1✔
944

945

946
@pytest.fixture
1✔
947
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
948
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
949
    assert "Endpoint" in status
1✔
950
    return f"https://{status['Endpoint']}"
1✔
951

952

953
@pytest.fixture
1✔
954
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
955
    document = {
1✔
956
        "first_name": "Boba",
957
        "last_name": "Fett",
958
        "age": 41,
959
        "about": "I'm just a simple man, trying to make my way in the universe.",
960
        "interests": ["mandalorian armor", "tusken culture"],
961
    }
962
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
963
    response = requests.put(
1✔
964
        document_path,
965
        data=json.dumps(document),
966
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
967
    )
968
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
969
    return document_path
1✔
970

971

972
# Cleanup fixtures
973
@pytest.fixture
1✔
974
def cleanup_stacks(aws_client):
1✔
975
    def _cleanup_stacks(stacks: List[str]) -> None:
1✔
976
        stacks = ensure_list(stacks)
1✔
977
        for stack in stacks:
1✔
978
            try:
1✔
979
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
980
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
981
            except Exception:
×
982
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
983

984
    return _cleanup_stacks
1✔
985

986

987
@pytest.fixture
1✔
988
def cleanup_changesets(aws_client):
1✔
989
    def _cleanup_changesets(changesets: List[str]) -> None:
1✔
990
        changesets = ensure_list(changesets)
1✔
991
        for cs in changesets:
1✔
992
            try:
1✔
993
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
994
            except Exception:
1✔
995
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
996

997
    return _cleanup_changesets
1✔
998

999

1000
# Helpers for Cfn
1001

1002

1003
# TODO: exports(!)
1004
@dataclasses.dataclass(frozen=True)
1✔
1005
class DeployResult:
1✔
1006
    change_set_id: str
1✔
1007
    stack_id: str
1✔
1008
    stack_name: str
1✔
1009
    change_set_name: str
1✔
1010
    outputs: Dict[str, str]
1✔
1011

1012
    destroy: Callable[[], None]
1✔
1013

1014

1015
class StackDeployError(Exception):
1✔
1016
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1017
        self.describe_result = describe_res
1✔
1018
        self.events = events
1✔
1019

1020
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1021
        if config.CFN_VERBOSE_ERRORS:
1✔
1022
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1023
        else:
1024
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1025

1026
        super().__init__(msg)
1✔
1027

1028
    def format_events(self, events: list[dict]) -> str:
1✔
1029
        formatted_events = []
1✔
1030

1031
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1032
        for event in chronological_events:
1✔
1033
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1034
                formatted_events.append(self.format_event(event))
1✔
1035

1036
        return "\n".join(formatted_events)
1✔
1037

1038
    @staticmethod
1✔
1039
    def format_event(event: dict) -> str:
1✔
1040
        if reason := event.get("ResourceStatusReason"):
1✔
1041
            reason = reason.replace("\n", "; ")
1✔
1042
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1043
        else:
1044
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
×
1045

1046

1047
@pytest.fixture
1✔
1048
def deploy_cfn_template(
1✔
1049
    aws_client: ServiceLevelClientFactory,
1050
):
1051
    state: list[tuple[str, Callable]] = []
1✔
1052

1053
    def _deploy(
1✔
1054
        *,
1055
        is_update: Optional[bool] = False,
1056
        stack_name: Optional[str] = None,
1057
        change_set_name: Optional[str] = None,
1058
        template: Optional[str] = None,
1059
        template_path: Optional[str | os.PathLike] = None,
1060
        template_mapping: Optional[Dict[str, Any]] = None,
1061
        parameters: Optional[Dict[str, str]] = None,
1062
        role_arn: Optional[str] = None,
1063
        max_wait: Optional[int] = None,
1064
        delay_between_polls: Optional[int] = 2,
1065
        custom_aws_client: Optional[ServiceLevelClientFactory] = None,
1066
    ) -> DeployResult:
1067
        if is_update:
1✔
1068
            assert stack_name
1✔
1069
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1070
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1071

1072
        if max_wait is None:
1✔
1073
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1074

1075
        if template_path is not None:
1✔
1076
            template = load_template_file(template_path)
1✔
1077
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1078

1079
        kwargs = dict(
1✔
1080
            StackName=stack_name,
1081
            ChangeSetName=change_set_name,
1082
            TemplateBody=template_rendered,
1083
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1084
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1085
            Parameters=[
1086
                {
1087
                    "ParameterKey": k,
1088
                    "ParameterValue": v,
1089
                }
1090
                for (k, v) in (parameters or {}).items()
1091
            ],
1092
        )
1093
        if role_arn is not None:
1✔
1094
            kwargs["RoleARN"] = role_arn
×
1095

1096
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1097

1098
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1099

1100
        change_set_id = response["Id"]
1✔
1101
        stack_id = response["StackId"]
1✔
1102

1103
        cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1104
            ChangeSetName=change_set_id
1105
        )
1106
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1107
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1108
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1109
        )
1110

1111
        try:
1✔
1112
            stack_waiter.wait(
1✔
1113
                StackName=stack_id,
1114
                WaiterConfig={
1115
                    "Delay": delay_between_polls,
1116
                    "MaxAttempts": max_wait / delay_between_polls,
1117
                },
1118
            )
1119
        except botocore.exceptions.WaiterError as e:
1✔
1120
            raise StackDeployError(
1✔
1121
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1122
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1123
                    "StackEvents"
1124
                ],
1125
            ) from e
1126

1127
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1128
            "Stacks"
1129
        ][0]
1130
        outputs = describe_stack_res.get("Outputs", [])
1✔
1131

1132
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1133

1134
        def _destroy_stack():
1✔
1135
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1136
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1137
                StackName=stack_id,
1138
                WaiterConfig={
1139
                    "Delay": delay_between_polls,
1140
                    "MaxAttempts": max_wait / delay_between_polls,
1141
                },
1142
            )
1143

1144
        state.append((stack_id, _destroy_stack))
1✔
1145

1146
        return DeployResult(
1✔
1147
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1148
        )
1149

1150
    yield _deploy
1✔
1151

1152
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1153
    for stack_id, teardown in state[::-1]:
1✔
1154
        try:
1✔
1155
            teardown()
1✔
1156
        except Exception as e:
1✔
1157
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1158

1159

1160
@pytest.fixture
1✔
1161
def is_change_set_created_and_available(aws_client):
1✔
1162
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1163
        def _inner():
1✔
1164
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1165
            return (
1✔
1166
                # TODO: CREATE_FAILED should also not lead to further retries
1167
                change_set.get("Status") == "CREATE_COMPLETE"
1168
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1169
            )
1170

1171
        return _inner
1✔
1172

1173
    return _is_change_set_created_and_available
1✔
1174

1175

1176
@pytest.fixture
1✔
1177
def is_change_set_failed_and_unavailable(aws_client):
1✔
1178
    def _is_change_set_created_and_available(change_set_id: str):
×
1179
        def _inner():
×
1180
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1181
            return (
×
1182
                # TODO: CREATE_FAILED should also not lead to further retries
1183
                change_set.get("Status") == "FAILED"
1184
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1185
            )
1186

1187
        return _inner
×
1188

1189
    return _is_change_set_created_and_available
×
1190

1191

1192
@pytest.fixture
1✔
1193
def is_stack_created(aws_client):
1✔
1194
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1195

1196

1197
@pytest.fixture
1✔
1198
def is_stack_updated(aws_client):
1✔
1199
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1200

1201

1202
@pytest.fixture
1✔
1203
def is_stack_deleted(aws_client):
1✔
1204
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1205

1206

1207
def _has_stack_status(cfn_client, statuses: List[str]):
1✔
1208
    def _has_status(stack_id: str):
1✔
1209
        def _inner():
1✔
1210
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1211
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1212
            return s.get("StackStatus") in statuses
1✔
1213

1214
        return _inner
1✔
1215

1216
    return _has_status
1✔
1217

1218

1219
@pytest.fixture
1✔
1220
def is_change_set_finished(aws_client):
1✔
1221
    def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):
1✔
1222
        def _inner():
1✔
1223
            kwargs = {"ChangeSetName": change_set_id}
1✔
1224
            if stack_name:
1✔
1225
                kwargs["StackName"] = stack_name
×
1226

1227
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
1✔
1228

1229
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
1✔
1230
                LOG.warning("Change set failed")
×
1231
                raise ShortCircuitWaitException()
×
1232

1233
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
1✔
1234

1235
        return _inner
1✔
1236

1237
    return _is_change_set_finished
1✔
1238

1239

1240
@pytest.fixture
1✔
1241
def wait_until_lambda_ready(aws_client):
1✔
1242
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1243
        client = client or aws_client.lambda_
1✔
1244

1245
        def _is_not_pending():
1✔
1246
            kwargs = {}
1✔
1247
            if qualifier:
1✔
1248
                kwargs["Qualifier"] = qualifier
×
1249
            try:
1✔
1250
                result = (
1✔
1251
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1252
                    != "Pending"
1253
                )
1254
                LOG.debug("lambda state result: result=%s", result)
1✔
1255
                return result
1✔
1256
            except Exception as e:
×
1257
                LOG.error(e)
×
1258
                raise
×
1259

1260
        wait_until(_is_not_pending)
1✔
1261

1262
    return _wait_until_ready
1✔
1263

1264

1265
role_assume_policy = """
1✔
1266
{
1267
  "Version": "2012-10-17",
1268
  "Statement": [
1269
    {
1270
      "Effect": "Allow",
1271
      "Principal": {
1272
        "Service": "lambda.amazonaws.com"
1273
      },
1274
      "Action": "sts:AssumeRole"
1275
    }
1276
  ]
1277
}
1278
""".strip()
1279

1280
role_policy = """
1✔
1281
{
1282
    "Version": "2012-10-17",
1283
    "Statement": [
1284
        {
1285
            "Effect": "Allow",
1286
            "Action": [
1287
                "logs:CreateLogGroup",
1288
                "logs:CreateLogStream",
1289
                "logs:PutLogEvents"
1290
            ],
1291
            "Resource": [
1292
                "*"
1293
            ]
1294
        }
1295
    ]
1296
}
1297
""".strip()
1298

1299

1300
@pytest.fixture
1✔
1301
def create_lambda_function_aws(aws_client):
1✔
1302
    lambda_arns = []
1✔
1303

1304
    def _create_lambda_function(**kwargs):
1✔
1305
        def _create_function():
1✔
1306
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1307
            lambda_arns.append(resp["FunctionArn"])
1✔
1308

1309
            def _is_not_pending():
1✔
1310
                try:
1✔
1311
                    result = (
1✔
1312
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1313
                            "Configuration"
1314
                        ]["State"]
1315
                        != "Pending"
1316
                    )
1317
                    return result
1✔
1318
                except Exception as e:
×
1319
                    LOG.error(e)
×
1320
                    raise
×
1321

1322
            wait_until(_is_not_pending)
1✔
1323
            return resp
1✔
1324

1325
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1326
        # localstack should normally not require the retries and will just continue here
1327
        return retry(_create_function, retries=3, sleep=4)
1✔
1328

1329
    yield _create_lambda_function
1✔
1330

1331
    for arn in lambda_arns:
1✔
1332
        try:
1✔
1333
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1334
        except Exception:
×
1335
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1336

1337

1338
@pytest.fixture
1✔
1339
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1340
    lambda_arns_and_clients = []
1✔
1341
    log_groups = []
1✔
1342
    lambda_client = aws_client.lambda_
1✔
1343
    logs_client = aws_client.logs
1✔
1344
    s3_client = aws_client.s3
1✔
1345

1346
    def _create_lambda_function(*args, **kwargs):
1✔
1347
        client = kwargs.get("client") or lambda_client
1✔
1348
        kwargs["client"] = client
1✔
1349
        kwargs["s3_client"] = s3_client
1✔
1350
        func_name = kwargs.get("func_name")
1✔
1351
        assert func_name
1✔
1352
        del kwargs["func_name"]
1✔
1353

1354
        if not kwargs.get("role"):
1✔
1355
            kwargs["role"] = lambda_su_role
1✔
1356

1357
        def _create_function():
1✔
1358
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1359
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1360
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1361
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1362
            log_groups.append(log_group_name)
1✔
1363
            return resp
1✔
1364

1365
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1366
        # localstack should normally not require the retries and will just continue here
1367
        return retry(_create_function, retries=3, sleep=4)
1✔
1368

1369
    yield _create_lambda_function
1✔
1370

1371
    for arn, client in lambda_arns_and_clients:
1✔
1372
        try:
1✔
1373
            client.delete_function(FunctionName=arn)
1✔
1374
        except Exception:
1✔
1375
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1376

1377
    for log_group_name in log_groups:
1✔
1378
        try:
1✔
1379
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1380
        except Exception:
1✔
1381
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1382

1383

1384
@pytest.fixture
1✔
1385
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1386
    from localstack.aws.api.lambda_ import Runtime
1✔
1387

1388
    lambda_client = aws_client.lambda_
1✔
1389
    handler_code = textwrap.dedent(
1✔
1390
        """
1391
    import json
1392
    import os
1393

1394

1395
    def make_response(body: dict, status_code: int = 200):
1396
        return {
1397
            "statusCode": status_code,
1398
            "headers": {"Content-Type": "application/json"},
1399
            "body": body,
1400
        }
1401

1402

1403
    def trim_headers(headers):
1404
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1405
            return headers
1406
        return {
1407
            key: value for key, value in headers.items()
1408
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1409
        }
1410

1411

1412
    def handler(event, context):
1413
        print(json.dumps(event))
1414
        response = {
1415
            "args": event.get("queryStringParameters", {}),
1416
            "data": event.get("body", ""),
1417
            "domain": event["requestContext"].get("domainName", ""),
1418
            "headers": trim_headers(event.get("headers", {})),
1419
            "method": event["requestContext"]["http"].get("method", ""),
1420
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1421
            "path": event["requestContext"]["http"].get("path", ""),
1422
        }
1423
        return make_response(response)"""
1424
    )
1425

1426
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1427
        """Creates a server that will echo any request. Any request will be returned with the
1428
        following format. Any unset values will have those defaults.
1429
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1430
        order to create easier Snapshot testing. Default: `False`
1431
        {
1432
          "args": {},
1433
          "headers": {},
1434
          "data": "",
1435
          "method": "",
1436
          "domain": "",
1437
          "origin": "",
1438
          "path": ""
1439
        }"""
1440
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1441
        func_name = f"echo-http-{short_uid()}"
1✔
1442
        create_lambda_function(
1✔
1443
            func_name=func_name,
1444
            zip_file=zip_file,
1445
            runtime=Runtime.python3_12,
1446
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1447
        )
1448
        url_response = lambda_client.create_function_url_config(
1✔
1449
            FunctionName=func_name, AuthType="NONE"
1450
        )
1451
        aws_client.lambda_.add_permission(
1✔
1452
            FunctionName=func_name,
1453
            StatementId="urlPermission",
1454
            Action="lambda:InvokeFunctionUrl",
1455
            Principal="*",
1456
            FunctionUrlAuthType="NONE",
1457
        )
1458
        return url_response["FunctionUrl"]
1✔
1459

1460
    yield _create_echo_http_server
1✔
1461

1462

1463
@pytest.fixture
1✔
1464
def create_event_source_mapping(aws_client):
1✔
1465
    uuids = []
1✔
1466

1467
    def _create_event_source_mapping(*args, **kwargs):
1✔
1468
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1469
        uuids.append(response["UUID"])
1✔
1470
        return response
1✔
1471

1472
    yield _create_event_source_mapping
1✔
1473

1474
    for uuid in uuids:
1✔
1475
        try:
1✔
1476
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1477
        except Exception:
×
1478
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1479

1480

1481
@pytest.fixture
1✔
1482
def check_lambda_logs(aws_client):
1✔
1483
    def _check_logs(func_name: str, expected_lines: List[str] = None) -> List[str]:
1✔
1484
        if not expected_lines:
1✔
1485
            expected_lines = []
×
1486
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1487
        log_messages = [e["message"] for e in log_events]
1✔
1488
        for line in expected_lines:
1✔
1489
            if ".*" in line:
1✔
1490
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1491
                if any(found):
1✔
1492
                    continue
1✔
1493
            assert line in log_messages
1✔
1494
        return log_messages
1✔
1495

1496
    return _check_logs
1✔
1497

1498

1499
@pytest.fixture
1✔
1500
def create_policy(aws_client):
1✔
1501
    policy_arns = []
1✔
1502

1503
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1504
        iam_client = iam_client or aws_client.iam
1✔
1505
        if "PolicyName" not in kwargs:
1✔
1506
            kwargs["PolicyName"] = f"policy-{short_uid()}"
×
1507
        response = iam_client.create_policy(*args, **kwargs)
1✔
1508
        policy_arn = response["Policy"]["Arn"]
1✔
1509
        policy_arns.append((policy_arn, iam_client))
1✔
1510
        return response
1✔
1511

1512
    yield _create_policy
1✔
1513

1514
    for policy_arn, iam_client in policy_arns:
1✔
1515
        try:
1✔
1516
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1517
        except Exception:
1✔
1518
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1519

1520

1521
@pytest.fixture
1✔
1522
def create_user(aws_client):
1✔
1523
    usernames = []
1✔
1524

1525
    def _create_user(**kwargs):
1✔
1526
        if "UserName" not in kwargs:
1✔
1527
            kwargs["UserName"] = f"user-{short_uid()}"
×
1528
        response = aws_client.iam.create_user(**kwargs)
1✔
1529
        usernames.append(response["User"]["UserName"])
1✔
1530
        return response
1✔
1531

1532
    yield _create_user
1✔
1533

1534
    for username in usernames:
1✔
1535
        try:
1✔
1536
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1537
        except ClientError as e:
1✔
1538
            LOG.debug(
1✔
1539
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1540
            )
1541
            continue
1✔
1542

1543
        for inline_policy in inline_policies:
1✔
1544
            try:
1✔
1545
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1546
            except Exception:
×
1547
                LOG.debug(
×
1548
                    "Could not delete user policy '%s' from '%s' during cleanup",
1549
                    inline_policy,
1550
                    username,
1551
                )
1552
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1553
            "AttachedPolicies"
1554
        ]
1555
        for attached_policy in attached_policies:
1✔
1556
            try:
1✔
1557
                aws_client.iam.detach_user_policy(
1✔
1558
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1559
                )
1560
            except Exception:
1✔
1561
                LOG.debug(
1✔
1562
                    "Error detaching policy '%s' from user '%s'",
1563
                    attached_policy["PolicyArn"],
1564
                    username,
1565
                )
1566
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1567
        for access_key in access_keys:
1✔
1568
            try:
1✔
1569
                aws_client.iam.delete_access_key(
1✔
1570
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1571
                )
1572
            except Exception:
×
1573
                LOG.debug(
×
1574
                    "Error deleting access key '%s' from user '%s'",
1575
                    access_key["AccessKeyId"],
1576
                    username,
1577
                )
1578

1579
        try:
1✔
1580
            aws_client.iam.delete_user(UserName=username)
1✔
1581
        except Exception as e:
1✔
1582
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1583

1584

1585
@pytest.fixture
1✔
1586
def wait_and_assume_role(aws_client):
1✔
1587
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1588
        if not session_name:
1✔
1589
            session_name = f"session-{short_uid()}"
1✔
1590

1591
        def assume_role():
1✔
1592
            return aws_client.sts.assume_role(
1✔
1593
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1594
            )["Credentials"]
1595

1596
        # need to retry a couple of times before we are allowed to assume this role in AWS
1597
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1598
        return keys
1✔
1599

1600
    return _wait_and_assume_role
1✔
1601

1602

1603
@pytest.fixture
1✔
1604
def create_role(aws_client):
1✔
1605
    role_names = []
1✔
1606

1607
    def _create_role(iam_client=None, **kwargs):
1✔
1608
        if not kwargs.get("RoleName"):
1✔
1609
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1610
        iam_client = iam_client or aws_client.iam
1✔
1611
        result = iam_client.create_role(**kwargs)
1✔
1612
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1613
        return result
1✔
1614

1615
    yield _create_role
1✔
1616

1617
    for role_name, iam_client in role_names:
1✔
1618
        # detach policies
1619
        try:
1✔
1620
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1621
                "AttachedPolicies"
1622
            ]
1623
        except ClientError as e:
1✔
1624
            LOG.debug(
1✔
1625
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1626
                e,
1627
                role_name,
1628
            )
1629
            continue
1✔
1630
        for attached_policy in attached_policies:
1✔
1631
            try:
1✔
1632
                iam_client.detach_role_policy(
1✔
1633
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1634
                )
1635
            except Exception:
×
1636
                LOG.debug(
×
1637
                    "Could not detach role policy '%s' from '%s' during cleanup",
1638
                    attached_policy["PolicyArn"],
1639
                    role_name,
1640
                )
1641
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1642
        for role_policy in role_policies:
1✔
1643
            try:
1✔
1644
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1645
            except Exception:
×
1646
                LOG.debug(
×
1647
                    "Could not delete role policy '%s' from '%s' during cleanup",
1648
                    role_policy,
1649
                    role_name,
1650
                )
1651
        try:
1✔
1652
            iam_client.delete_role(RoleName=role_name)
1✔
1653
        except Exception:
×
1654
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1655

1656

1657
@pytest.fixture
1✔
1658
def create_parameter(aws_client):
1✔
1659
    params = []
1✔
1660

1661
    def _create_parameter(**kwargs):
1✔
1662
        params.append(kwargs["Name"])
1✔
1663
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1664

1665
    yield _create_parameter
1✔
1666

1667
    for param in params:
1✔
1668
        aws_client.ssm.delete_parameter(Name=param)
1✔
1669

1670

1671
@pytest.fixture
1✔
1672
def create_secret(aws_client):
1✔
1673
    items = []
1✔
1674

1675
    def _create_parameter(**kwargs):
1✔
1676
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1677
        items.append(create_response["ARN"])
1✔
1678
        return create_response
1✔
1679

1680
    yield _create_parameter
1✔
1681

1682
    for item in items:
1✔
1683
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1684

1685

1686
# TODO Figure out how to make cert creation tests pass against AWS.
1687
#
1688
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1689
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1690
#
1691
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1692
# by adding some CNAME records as requested by ASW in response to a certificate request.
1693
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1694
#
1695
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1696
# created by openssl etc. Not sure if there are other issues after that.
1697
#
1698
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1699
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1700
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1701
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1702
# pre-created certificates we would have to use specific domain names instead of random ones.
1703
@pytest.fixture
1✔
1704
def acm_request_certificate(aws_client_factory):
1✔
1705
    certificate_arns = []
1✔
1706

1707
    def factory(**kwargs) -> str:
1✔
1708
        if "DomainName" not in kwargs:
1✔
1709
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1710

1711
        region_name = kwargs.pop("region_name", None)
1✔
1712
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1713

1714
        response = acm_client.request_certificate(**kwargs)
1✔
1715
        created_certificate_arn = response["CertificateArn"]
1✔
1716
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1717
        return response
1✔
1718

1719
    yield factory
1✔
1720

1721
    # cleanup
1722
    for certificate_arn, region_name in certificate_arns:
1✔
1723
        try:
1✔
1724
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1725
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1726
        except Exception as e:
×
1727
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1728

1729

1730
role_policy_su = {
1✔
1731
    "Version": "2012-10-17",
1732
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1733
}
1734

1735

1736
@pytest.fixture(scope="session")
1✔
1737
def lambda_su_role(aws_client):
1✔
1738
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1739
    role = aws_client.iam.create_role(
1✔
1740
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1741
    )["Role"]
1742
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1743
    policy_arn = aws_client.iam.create_policy(
1✔
1744
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1745
    )["Policy"]["Arn"]
1746
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1747

1748
    if is_aws_cloud():  # dirty but necessary
1✔
1749
        time.sleep(10)
×
1750

1751
    yield role["Arn"]
1✔
1752

1753
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1754
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1755
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1756

1757

1758
@pytest.fixture
1✔
1759
def create_iam_role_and_attach_policy(aws_client):
1✔
1760
    """
1761
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1762

1763
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1764
    """
1765
    roles = []
×
1766

1767
    def _inner(**kwargs: dict[str, any]) -> str:
×
1768
        """
1769
        :param dict RoleDefinition: role definition document
1770
        :param str PolicyArn: policy ARN
1771
        :param str RoleName: role name (autogenerated if omitted)
1772
        :return: role ARN
1773
        """
1774
        if "RoleName" not in kwargs:
×
1775
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1776

1777
        role = kwargs["RoleName"]
×
1778
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1779

1780
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1781
        role_arn = result["Role"]["Arn"]
×
1782

1783
        policy_arn = kwargs["PolicyArn"]
×
1784
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1785

1786
        roles.append(role)
×
1787
        return role_arn
×
1788

1789
    yield _inner
×
1790

1791
    for role in roles:
×
1792
        try:
×
1793
            aws_client.iam.delete_role(RoleName=role)
×
1794
        except Exception as exc:
×
1795
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1796

1797

1798
@pytest.fixture
1✔
1799
def create_iam_role_with_policy(aws_client):
1✔
1800
    """
1801
    Fixture that creates an IAM role with given role definition and policy definition.
1802
    """
1803
    roles = {}
1✔
1804

1805
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1806
        """
1807
        :param dict RoleDefinition: role definition document
1808
        :param dict PolicyDefinition: policy definition document
1809
        :param str PolicyName: policy name (autogenerated if omitted)
1810
        :param str RoleName: role name (autogenerated if omitted)
1811
        :return: role ARN
1812
        """
1813
        if "RoleName" not in kwargs:
1✔
1814
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1815
        role = kwargs["RoleName"]
1✔
1816
        if "PolicyName" not in kwargs:
1✔
1817
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1818
        policy = kwargs["PolicyName"]
1✔
1819
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1820

1821
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1822
        role_arn = result["Role"]["Arn"]
1✔
1823

1824
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1825
        aws_client.iam.put_role_policy(
1✔
1826
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1827
        )
1828
        roles[role] = policy
1✔
1829
        return role_arn
1✔
1830

1831
    yield _create_role_and_policy
1✔
1832

1833
    for role_name, policy_name in roles.items():
1✔
1834
        try:
1✔
1835
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1836
        except Exception as exc:
×
1837
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1838
        try:
1✔
1839
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1840
        except Exception as exc:
×
1841
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1842

1843

1844
@pytest.fixture
1✔
1845
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1846
    delivery_stream_names = []
1✔
1847

1848
    def _create_delivery_stream(**kwargs):
1✔
1849
        if "DeliveryStreamName" not in kwargs:
1✔
1850
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1851
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1852
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1853
        delivery_stream_names.append(delivery_stream_name)
1✔
1854
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1855
        return response
1✔
1856

1857
    yield _create_delivery_stream
1✔
1858

1859
    for delivery_stream_name in delivery_stream_names:
1✔
1860
        try:
1✔
1861
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1862
        except Exception:
×
1863
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1864

1865

1866
@pytest.fixture
1✔
1867
def ses_configuration_set(aws_client):
1✔
1868
    configuration_set_names = []
1✔
1869

1870
    def factory(name: str) -> None:
1✔
1871
        aws_client.ses.create_configuration_set(
1✔
1872
            ConfigurationSet={
1873
                "Name": name,
1874
            },
1875
        )
1876
        configuration_set_names.append(name)
1✔
1877

1878
    yield factory
1✔
1879

1880
    for configuration_set_name in configuration_set_names:
1✔
1881
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1882

1883

1884
@pytest.fixture
1✔
1885
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1886
    event_destinations = []
1✔
1887

1888
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1889
        aws_client.ses.create_configuration_set_event_destination(
1✔
1890
            ConfigurationSetName=config_set_name,
1891
            EventDestination={
1892
                "Name": event_destination_name,
1893
                "Enabled": True,
1894
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1895
                "SNSDestination": {
1896
                    "TopicARN": topic_arn,
1897
                },
1898
            },
1899
        )
1900
        event_destinations.append((config_set_name, event_destination_name))
1✔
1901

1902
    yield factory
1✔
1903

1904
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1905
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1906
            ConfigurationSetName=created_config_set_name,
1907
            EventDestinationName=created_event_destination_name,
1908
        )
1909

1910

1911
@pytest.fixture
1✔
1912
def ses_email_template(aws_client):
1✔
1913
    template_names = []
1✔
1914

1915
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1916
        aws_client.ses.create_template(
1✔
1917
            Template={
1918
                "TemplateName": name,
1919
                "SubjectPart": subject,
1920
                "TextPart": contents,
1921
            }
1922
        )
1923
        template_names.append(name)
1✔
1924

1925
    yield factory
1✔
1926

1927
    for template_name in template_names:
1✔
1928
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1929

1930

1931
@pytest.fixture
1✔
1932
def ses_verify_identity(aws_client):
1✔
1933
    identities = []
1✔
1934

1935
    def factory(email_address: str) -> None:
1✔
1936
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1937

1938
    yield factory
1✔
1939

1940
    for identity in identities:
1✔
1941
        aws_client.ses.delete_identity(Identity=identity)
×
1942

1943

1944
@pytest.fixture
1✔
1945
def ec2_create_security_group(aws_client):
1✔
1946
    ec2_sgs = []
1✔
1947

1948
    def factory(ports=None, **kwargs):
1✔
1949
        if "GroupName" not in kwargs:
1✔
1950
            kwargs["GroupName"] = f"test-sg-{short_uid()}"
1✔
1951
        security_group = aws_client.ec2.create_security_group(**kwargs)
1✔
1952

1953
        permissions = [
1✔
1954
            {
1955
                "FromPort": port,
1956
                "IpProtocol": "tcp",
1957
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
1958
                "ToPort": port,
1959
            }
1960
            for port in ports or []
1961
        ]
1962
        aws_client.ec2.authorize_security_group_ingress(
1✔
1963
            GroupName=kwargs["GroupName"],
1964
            IpPermissions=permissions,
1965
        )
1966

1967
        ec2_sgs.append(security_group["GroupId"])
1✔
1968
        return security_group
1✔
1969

1970
    yield factory
1✔
1971

1972
    for sg_group_id in ec2_sgs:
1✔
1973
        try:
1✔
1974
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
1975
        except Exception as e:
×
1976
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
1977

1978

1979
@pytest.fixture
1✔
1980
def cleanups():
1✔
1981
    cleanup_fns = []
1✔
1982

1983
    yield cleanup_fns
1✔
1984

1985
    for cleanup_callback in cleanup_fns[::-1]:
1✔
1986
        try:
1✔
1987
            cleanup_callback()
1✔
1988
        except Exception as e:
1✔
1989
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
1990

1991

1992
@pytest.fixture(scope="session")
1✔
1993
def account_id(aws_client):
1✔
1994
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
1995
        return aws_client.sts.get_caller_identity()["Account"]
1✔
1996
    else:
1997
        return TEST_AWS_ACCOUNT_ID
×
1998

1999

2000
@pytest.fixture(scope="session")
1✔
2001
def region_name(aws_client):
1✔
2002
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2003
        return aws_client.sts.meta.region_name
1✔
2004
    else:
2005
        return TEST_AWS_REGION_NAME
×
2006

2007

2008
@pytest.fixture(scope="session")
1✔
2009
def partition(region_name):
1✔
2010
    return get_partition(region_name)
1✔
2011

2012

2013
@pytest.fixture(scope="session")
1✔
2014
def secondary_account_id(secondary_aws_client):
1✔
2015
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2016
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2017
    else:
2018
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2019

2020

2021
@pytest.fixture(scope="session")
1✔
2022
def secondary_region_name():
1✔
2023
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2024

2025

2026
@pytest.hookimpl
1✔
2027
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2028
    only_localstack = pytest.mark.skipif(
1✔
2029
        is_aws_cloud(),
2030
        reason="test only applicable if run against localstack",
2031
    )
2032
    for item in items:
1✔
2033
        for mark in item.iter_markers():
1✔
2034
            if mark.name.endswith("only_localstack"):
1✔
2035
                item.add_marker(only_localstack)
1✔
2036
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2037
            # add a marker that indicates that this test is snapshot validated
2038
            # if it uses the snapshot fixture -> allows selecting only snapshot
2039
            # validated tests in order to capture new snapshots for a whole
2040
            # test file with "-m snapshot_validated"
2041
            item.add_marker("snapshot_validated")
1✔
2042

2043

2044
@pytest.fixture
1✔
2045
def sample_stores() -> AccountRegionBundle:
1✔
2046
    class SampleStore(BaseStore):
1✔
2047
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2048
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2049
        region_specific_attr = LocalAttribute(default=list)
1✔
2050

2051
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2052

2053

2054
@pytest.fixture
1✔
2055
def create_rest_apigw(aws_client_factory):
1✔
2056
    rest_apis = []
1✔
2057

2058
    def _create_apigateway_function(**kwargs):
1✔
2059
        region_name = kwargs.pop("region_name", None)
1✔
2060
        client_config = None
1✔
2061
        if is_aws_cloud():
1✔
2062
            client_config = botocore.config.Config(
×
2063
                # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2064
                retries={"max_attempts": 10, "mode": "adaptive"}
2065
            )
2066
        apigateway_client = aws_client_factory(
1✔
2067
            region_name=region_name, config=client_config
2068
        ).apigateway
2069
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2070

2071
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2072
        api_id = response.get("id")
1✔
2073
        rest_apis.append((api_id, region_name))
1✔
2074

2075
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2076

2077
    yield _create_apigateway_function
1✔
2078

2079
    for rest_api_id, region_name in rest_apis:
1✔
2080
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
1✔
2081
        # First, retrieve the usage plans associated with the REST API
2082
        usage_plan_ids = []
1✔
2083
        usage_plans = apigateway_client.get_usage_plans()
1✔
2084
        for item in usage_plans.get("items", []):
1✔
2085
            api_stages = item.get("apiStages", [])
1✔
2086
            usage_plan_ids.extend(
1✔
2087
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2088
            )
2089

2090
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2091
        with contextlib.suppress(Exception):
1✔
2092
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2093

2094
        # finally delete the usage plans and the API Keys linked to it
2095
        for usage_plan_id in usage_plan_ids:
1✔
2096
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2097
            for key in usage_plan_keys.get("items", []):
1✔
2098
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2099
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2100

2101

2102
@pytest.fixture
1✔
2103
def create_rest_apigw_openapi(aws_client_factory):
1✔
2104
    rest_apis = []
×
2105

2106
    def _create_apigateway_function(**kwargs):
×
2107
        region_name = kwargs.pop("region_name", None)
×
2108
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2109

2110
        response = apigateway_client.import_rest_api(**kwargs)
×
2111
        api_id = response.get("id")
×
2112
        rest_apis.append((api_id, region_name))
×
2113
        return api_id, response
×
2114

2115
    yield _create_apigateway_function
×
2116

2117
    for rest_api_id, region_name in rest_apis:
×
2118
        with contextlib.suppress(Exception):
×
2119
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2120
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2121

2122

2123
@pytest.fixture
1✔
2124
def appsync_create_api(aws_client):
1✔
2125
    graphql_apis = []
×
2126

2127
    def factory(**kwargs):
×
2128
        if "name" not in kwargs:
×
2129
            kwargs["name"] = f"graphql-api-testing-name-{short_uid()}"
×
2130
        if not kwargs.get("authenticationType"):
×
2131
            kwargs["authenticationType"] = "API_KEY"
×
2132

2133
        result = aws_client.appsync.create_graphql_api(**kwargs)["graphqlApi"]
×
2134
        graphql_apis.append(result["apiId"])
×
2135
        return result
×
2136

2137
    yield factory
×
2138

2139
    for api in graphql_apis:
×
2140
        try:
×
2141
            aws_client.appsync.delete_graphql_api(apiId=api)
×
2142
        except Exception as e:
×
2143
            LOG.debug("Error cleaning up AppSync API: %s, %s", api, e)
×
2144

2145

2146
@pytest.fixture
1✔
2147
def assert_host_customisation(monkeypatch):
1✔
2148
    localstack_host = "foo.bar"
1✔
2149
    monkeypatch.setattr(
1✔
2150
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2151
    )
2152

2153
    def asserter(
1✔
2154
        url: str,
2155
        *,
2156
        custom_host: Optional[str] = None,
2157
    ):
2158
        if custom_host is not None:
1✔
2159
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2160

2161
            assert localstack_host not in url
×
2162
        else:
2163
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2164

2165
    yield asserter
1✔
2166

2167

2168
@pytest.fixture
1✔
2169
def echo_http_server(httpserver: HTTPServer):
1✔
2170
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2171

2172
    def _echo(request: Request) -> Response:
1✔
2173
        result = {
1✔
2174
            "data": request.data or "{}",
2175
            "headers": dict(request.headers),
2176
            "url": request.url,
2177
            "method": request.method,
2178
        }
2179
        response_body = json.dumps(json_safe(result))
1✔
2180
        return Response(response_body, status=200)
1✔
2181

2182
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2183
    http_endpoint = httpserver.url_for("/")
1✔
2184

2185
    return http_endpoint
1✔
2186

2187

2188
@pytest.fixture
1✔
2189
def echo_http_server_post(echo_http_server):
1✔
2190
    """
2191
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2192
    """
2193
    if is_aws_cloud():
1✔
2194
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2195

2196
    return f"{echo_http_server}post"
1✔
2197

2198

2199
def create_policy_doc(effect: str, actions: List, resource=None) -> Dict:
1✔
2200
    actions = ensure_list(actions)
1✔
2201
    resource = resource or "*"
1✔
2202
    return {
1✔
2203
        "Version": "2012-10-17",
2204
        "Statement": [
2205
            {
2206
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2207
                "Sid": f"s{short_uid()}",
2208
                "Effect": effect,
2209
                "Action": actions,
2210
                "Resource": resource,
2211
            }
2212
        ],
2213
    }
2214

2215

2216
@pytest.fixture
1✔
2217
def create_policy_generated_document(create_policy):
1✔
2218
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2219
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2220
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2221
        response = create_policy(
1✔
2222
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2223
        )
2224
        policy_arn = response["Policy"]["Arn"]
1✔
2225
        return policy_arn
1✔
2226

2227
    return _create_policy_with_doc
1✔
2228

2229

2230
@pytest.fixture
1✔
2231
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2232
    def _create_role_with_policy(
1✔
2233
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2234
    ):
2235
        iam_client = iam_client or aws_client.iam
1✔
2236

2237
        role_name = f"role-{short_uid()}"
1✔
2238
        result = create_role(
1✔
2239
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2240
        )
2241
        role_arn = result["Role"]["Arn"]
1✔
2242
        policy_name = f"p-{short_uid()}"
1✔
2243

2244
        if attach:
1✔
2245
            # create role and attach role policy
2246
            policy_arn = create_policy_generated_document(
1✔
2247
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2248
            )
2249
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2250
        else:
2251
            # put role policy
2252
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2253
            policy_document = json.dumps(policy_document)
1✔
2254
            iam_client.put_role_policy(
1✔
2255
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2256
            )
2257

2258
        return role_name, role_arn
1✔
2259

2260
    return _create_role_with_policy
1✔
2261

2262

2263
@pytest.fixture
1✔
2264
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2265
    def _create_user_with_policy(effect, actions, resource=None):
×
2266
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2267
        username = f"user-{short_uid()}"
×
2268
        create_user(UserName=username)
×
2269
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2270
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2271
        return username, keys
×
2272

2273
    return _create_user_with_policy
×
2274

2275

2276
@pytest.fixture()
1✔
2277
def register_extension(s3_bucket, aws_client):
1✔
2278
    cfn_client = aws_client.cloudformation
×
2279
    extensions_arns = []
×
2280

2281
    def _register(extension_name, extension_type, artifact_path):
×
2282
        bucket = s3_bucket
×
2283
        key = f"artifact-{short_uid()}"
×
2284

2285
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2286

2287
        register_response = cfn_client.register_type(
×
2288
            Type=extension_type,
2289
            TypeName=extension_name,
2290
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2291
        )
2292

2293
        registration_token = register_response["RegistrationToken"]
×
2294
        cfn_client.get_waiter("type_registration_complete").wait(
×
2295
            RegistrationToken=registration_token
2296
        )
2297

2298
        describe_response = cfn_client.describe_type_registration(
×
2299
            RegistrationToken=registration_token
2300
        )
2301

2302
        extensions_arns.append(describe_response["TypeArn"])
×
2303
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2304

2305
        return describe_response
×
2306

2307
    yield _register
×
2308

2309
    for arn in extensions_arns:
×
2310
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2311
        for v in versions:
×
2312
            try:
×
2313
                cfn_client.deregister_type(Arn=v["Arn"])
×
2314
            except Exception:
×
2315
                continue
×
2316
        cfn_client.deregister_type(Arn=arn)
×
2317

2318

2319
@pytest.fixture
1✔
2320
def hosted_zone(aws_client):
1✔
2321
    zone_ids = []
1✔
2322

2323
    def factory(**kwargs):
1✔
2324
        if "CallerReference" not in kwargs:
1✔
2325
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2326
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2327
        zone_id = response["HostedZone"]["Id"]
1✔
2328
        zone_ids.append(zone_id)
1✔
2329
        return response
1✔
2330

2331
    yield factory
1✔
2332

2333
    for zone_id in zone_ids[::-1]:
1✔
2334
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2335

2336

2337
@pytest.fixture
1✔
2338
def openapi_validate(monkeypatch):
1✔
2339
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2340
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2341

2342

2343
@pytest.fixture
1✔
2344
def set_resource_custom_id():
1✔
2345
    set_ids = []
1✔
2346

2347
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2348
        localstack_id_manager.set_custom_id(
1✔
2349
            resource_identifier=resource_identifier, custom_id=custom_id
2350
        )
2351
        set_ids.append(resource_identifier)
1✔
2352

2353
    yield _set_custom_id
1✔
2354

2355
    for resource_identifier in set_ids:
1✔
2356
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2357

2358

2359
###############################
2360
# Events (EventBridge) fixtures
2361
###############################
2362

2363

2364
@pytest.fixture
1✔
2365
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2366
    event_bus_names = []
1✔
2367

2368
    def _create_event_bus(**kwargs):
1✔
2369
        if "Name" not in kwargs:
1✔
2370
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2371

2372
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2373
        event_bus_names.append(kwargs["Name"])
1✔
2374
        return response
1✔
2375

2376
    yield _create_event_bus
1✔
2377

2378
    for event_bus_name in event_bus_names:
1✔
2379
        try:
1✔
2380
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2381
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2382

2383
            # Delete all rules for the current event bus
2384
            for rule in rules:
1✔
2385
                try:
1✔
2386
                    response = aws_client.events.list_targets_by_rule(
1✔
2387
                        Rule=rule, EventBusName=event_bus_name
2388
                    )
2389
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2390

2391
                    # Remove all targets for the current rule
2392
                    if targets:
1✔
2393
                        for target in targets:
1✔
2394
                            aws_client.events.remove_targets(
1✔
2395
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2396
                            )
2397

2398
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2399
                except Exception as e:
×
2400
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2401

2402
            # Delete archives for event bus
2403
            event_source_arn = (
1✔
2404
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2405
            )
2406
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2407
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2408
            for archive in archives:
1✔
2409
                try:
1✔
2410
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2411
                except Exception as e:
×
2412
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2413

2414
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2415
        except Exception as e:
1✔
2416
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2417

2418

2419
@pytest.fixture
1✔
2420
def events_put_rule(aws_client):
1✔
2421
    rules = []
1✔
2422

2423
    def _put_rule(**kwargs):
1✔
2424
        if "Name" not in kwargs:
1✔
2425
            kwargs["Name"] = f"rule-{short_uid()}"
×
2426

2427
        response = aws_client.events.put_rule(**kwargs)
1✔
2428
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2429
        return response
1✔
2430

2431
    yield _put_rule
1✔
2432

2433
    for rule, event_bus_name in rules:
1✔
2434
        try:
1✔
2435
            response = aws_client.events.list_targets_by_rule(
1✔
2436
                Rule=rule, EventBusName=event_bus_name
2437
            )
2438
            targets = [target["Id"] for target in response["Targets"]]
1✔
2439

2440
            # Remove all targets for the current rule
2441
            if targets:
1✔
2442
                for target in targets:
1✔
2443
                    aws_client.events.remove_targets(
1✔
2444
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2445
                    )
2446

2447
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2448
        except Exception as e:
1✔
2449
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2450

2451

2452
@pytest.fixture
1✔
2453
def events_create_rule(aws_client):
1✔
2454
    rules = []
1✔
2455

2456
    def _create_rule(**kwargs):
1✔
2457
        rule_name = kwargs["Name"]
1✔
2458
        bus_name = kwargs.get("EventBusName", "")
1✔
2459
        pattern = kwargs.get("EventPattern", {})
1✔
2460
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2461
        rule_arn = aws_client.events.put_rule(
1✔
2462
            Name=rule_name,
2463
            EventBusName=bus_name,
2464
            EventPattern=json.dumps(pattern),
2465
            ScheduleExpression=schedule,
2466
        )["RuleArn"]
2467
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2468
        return rule_arn
1✔
2469

2470
    yield _create_rule
1✔
2471

2472
    for rule in rules:
1✔
2473
        targets = aws_client.events.list_targets_by_rule(
1✔
2474
            Rule=rule["name"], EventBusName=rule["bus"]
2475
        )["Targets"]
2476

2477
        targetIds = [target["Id"] for target in targets]
1✔
2478
        if len(targetIds) > 0:
1✔
2479
            aws_client.events.remove_targets(
1✔
2480
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2481
            )
2482

2483
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2484

2485

2486
@pytest.fixture
1✔
2487
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2488
    queue_urls = []
1✔
2489

2490
    def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]:
1✔
2491
        if not queue_name:
1✔
2492
            queue_name = f"tests-queue-{short_uid()}"
1✔
2493
        sqs_client = aws_client.sqs
1✔
2494
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2495
        queue_urls.append(queue_url)
1✔
2496
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2497
        policy = {
1✔
2498
            "Version": "2012-10-17",
2499
            "Id": f"sqs-eventbridge-{short_uid()}",
2500
            "Statement": [
2501
                {
2502
                    "Sid": f"SendMessage-{short_uid()}",
2503
                    "Effect": "Allow",
2504
                    "Principal": {"Service": "events.amazonaws.com"},
2505
                    "Action": "sqs:SendMessage",
2506
                    "Resource": queue_arn,
2507
                }
2508
            ],
2509
        }
2510
        sqs_client.set_queue_attributes(
1✔
2511
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2512
        )
2513
        return queue_url, queue_arn
1✔
2514

2515
    yield _sqs_as_events_target
1✔
2516

2517
    for queue_url in queue_urls:
1✔
2518
        try:
1✔
2519
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
2520
        except Exception as e:
×
2521
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2522

2523

2524
@pytest.fixture
1✔
2525
def clean_up(
1✔
2526
    aws_client,
2527
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2528
    def _clean_up(
1✔
2529
        bus_name=None,
2530
        rule_name=None,
2531
        target_ids=None,
2532
        queue_url=None,
2533
        log_group_name=None,
2534
    ):
2535
        events_client = aws_client.events
1✔
2536
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2537
        if target_ids:
1✔
2538
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2539
            call_safe(
1✔
2540
                events_client.remove_targets,
2541
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2542
            )
2543
        if rule_name:
1✔
2544
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2545
        if bus_name:
1✔
2546
            call_safe(events_client.delete_event_bus, kwargs=dict(Name=bus_name))
×
2547
        if queue_url:
1✔
2548
            sqs_client = aws_client.sqs
×
2549
            call_safe(sqs_client.delete_queue, kwargs=dict(QueueUrl=queue_url))
×
2550
        if log_group_name:
1✔
2551
            logs_client = aws_client.logs
×
2552

2553
            def _delete_log_group():
×
2554
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2555
                for log_stream in log_streams["logStreams"]:
×
2556
                    logs_client.delete_log_stream(
×
2557
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2558
                    )
2559
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2560

2561
            call_safe(_delete_log_group)
×
2562

2563
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc