• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 99d68267-703b-430f-8888-adccf51e571d

18 Apr 2025 08:38AM UTC coverage: 86.269% (-0.01%) from 86.279%
99d68267-703b-430f-8888-adccf51e571d

push

circleci

web-flow
apply fix for podman container labels dict (#12526)

Co-authored-by: Tjeerd Ritsma <tjeerd@playgroundtech.io>

2 of 3 new or added lines in 1 file covered. (66.67%)

28 existing lines in 12 files now uncovered.

63883 of 74051 relevant lines covered (86.27%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.35
/localstack-core/localstack/testing/pytest/fixtures.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import textwrap
1✔
8
import time
1✔
9
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
1✔
10

11
import botocore.auth
1✔
12
import botocore.config
1✔
13
import botocore.credentials
1✔
14
import botocore.session
1✔
15
import pytest
1✔
16
from _pytest.config import Config
1✔
17
from _pytest.nodes import Item
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.regions import EndpointResolver
1✔
20
from pytest_httpserver import HTTPServer
1✔
21
from werkzeug import Request, Response
1✔
22

23
from localstack import config
1✔
24
from localstack.aws.connect import ServiceLevelClientFactory
1✔
25
from localstack.services.stores import (
1✔
26
    AccountRegionBundle,
27
    BaseStore,
28
    CrossAccountAttribute,
29
    CrossRegionAttribute,
30
    LocalAttribute,
31
)
32
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
1✔
33
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
1✔
34
from localstack.testing.config import (
1✔
35
    SECONDARY_TEST_AWS_ACCOUNT_ID,
36
    SECONDARY_TEST_AWS_REGION_NAME,
37
    TEST_AWS_ACCOUNT_ID,
38
    TEST_AWS_REGION_NAME,
39
)
40
from localstack.utils import testutil
1✔
41
from localstack.utils.aws.arns import get_partition
1✔
42
from localstack.utils.aws.client import SigningHttpClient
1✔
43
from localstack.utils.aws.resources import create_dynamodb_table
1✔
44
from localstack.utils.bootstrap import is_api_enabled
1✔
45
from localstack.utils.collections import ensure_list
1✔
46
from localstack.utils.functions import call_safe, run_safe
1✔
47
from localstack.utils.http import safe_requests as requests
1✔
48
from localstack.utils.id_generator import ResourceIdentifier, localstack_id_manager
1✔
49
from localstack.utils.json import CustomEncoder, json_safe
1✔
50
from localstack.utils.net import wait_for_port_open
1✔
51
from localstack.utils.strings import short_uid, to_str
1✔
52
from localstack.utils.sync import ShortCircuitWaitException, poll_condition, retry, wait_until
1✔
53

54
LOG = logging.getLogger(__name__)
1✔
55

56
# URL of public HTTP echo server, used primarily for AWS parity/snapshot testing
57
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"
1✔
58

59
WAITER_CHANGE_SET_CREATE_COMPLETE = "change_set_create_complete"
1✔
60
WAITER_STACK_CREATE_COMPLETE = "stack_create_complete"
1✔
61
WAITER_STACK_UPDATE_COMPLETE = "stack_update_complete"
1✔
62
WAITER_STACK_DELETE_COMPLETE = "stack_delete_complete"
1✔
63

64

65
if TYPE_CHECKING:
1✔
66
    from mypy_boto3_sqs import SQSClient
×
67
    from mypy_boto3_sqs.type_defs import MessageTypeDef
×
68

69

70
@pytest.fixture(scope="session")
1✔
71
def aws_client_no_retry(aws_client_factory):
1✔
72
    """
73
    This fixture can be used to obtain Boto clients with disabled retries for testing.
74
    botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
75

76
    Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
77
    to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
78

79
    This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
80
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
81
    General socket/connection errors:
82
    * ConnectionError
83
    * ConnectionClosedError
84
    * ReadTimeoutError
85
    * EndpointConnectionError
86

87
    Service-side throttling/limit errors and exceptions:
88
    * Throttling
89
    * ThrottlingException
90
    * ThrottledException
91
    * RequestThrottledException
92
    * ProvisionedThroughputExceededException
93

94
    HTTP status codes: 429, 500, 502, 503, 504, and 509
95

96
    Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
97
    """
98
    no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
1✔
99
    return aws_client_factory(config=no_retry_config)
1✔
100

101

102
@pytest.fixture(scope="class")
1✔
103
def aws_http_client_factory(aws_session):
1✔
104
    """
105
    Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
106
    The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
107
    transparently uses your global profile when TEST_TARGET=AWS_CLOUD, or test credentials when running against
108
    LocalStack.
109

110
    Example invocations
111

112
        client = aws_signing_http_client_factory("sqs")
113
        client.get("http://localhost:4566/000000000000/my-queue")
114

115
    or
116
        client = aws_signing_http_client_factory("dynamodb", signer_factory=SigV4Auth)
117
        client.post("...")
118
    """
119

120
    def factory(
1✔
121
        service: str,
122
        region: str = None,
123
        signer_factory: Callable[
124
            [botocore.credentials.Credentials, str, str], botocore.auth.BaseSigner
125
        ] = botocore.auth.SigV4QueryAuth,
126
        endpoint_url: str = None,
127
        aws_access_key_id: str = None,
128
        aws_secret_access_key: str = None,
129
    ):
130
        region = region or TEST_AWS_REGION_NAME
1✔
131

132
        if aws_access_key_id or aws_secret_access_key:
1✔
133
            credentials = botocore.credentials.Credentials(
1✔
134
                access_key=aws_access_key_id, secret_key=aws_secret_access_key
135
            )
136
        else:
137
            credentials = aws_session.get_credentials()
1✔
138

139
        creds = credentials.get_frozen_credentials()
1✔
140

141
        if not endpoint_url:
1✔
142
            if is_aws_cloud():
1✔
143
                # FIXME: this is a bit raw. we should probably re-use boto in a better way
144
                resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
×
145
                endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
×
146
            else:
147
                endpoint_url = config.internal_service_url()
1✔
148

149
        return SigningHttpClient(signer_factory(creds, service, region), endpoint_url=endpoint_url)
1✔
150

151
    return factory
1✔
152

153

154
@pytest.fixture(scope="class")
1✔
155
def s3_vhost_client(aws_client_factory, region_name):
1✔
156
    return aws_client_factory(
1✔
157
        config=botocore.config.Config(s3={"addressing_style": "virtual"}), region_name=region_name
158
    ).s3
159

160

161
@pytest.fixture
1✔
162
def dynamodb_wait_for_table_active(aws_client):
1✔
163
    def wait_for_table_active(table_name: str, client=None):
1✔
164
        def wait():
1✔
165
            return (client or aws_client.dynamodb).describe_table(TableName=table_name)["Table"][
1✔
166
                "TableStatus"
167
            ] == "ACTIVE"
168

169
        poll_condition(wait, timeout=30)
1✔
170

171
    return wait_for_table_active
1✔
172

173

174
@pytest.fixture
1✔
175
def dynamodb_create_table_with_parameters(dynamodb_wait_for_table_active, aws_client):
1✔
176
    tables = []
1✔
177

178
    def factory(**kwargs):
1✔
179
        if "TableName" not in kwargs:
1✔
180
            kwargs["TableName"] = f"test-table-{short_uid()}"
×
181

182
        tables.append(kwargs["TableName"])
1✔
183
        response = aws_client.dynamodb.create_table(**kwargs)
1✔
184
        dynamodb_wait_for_table_active(kwargs["TableName"])
1✔
185
        return response
1✔
186

187
    yield factory
1✔
188

189
    # cleanup
190
    for table in tables:
1✔
191
        try:
1✔
192
            # table has to be in ACTIVE state before deletion
193
            dynamodb_wait_for_table_active(table)
1✔
194
            aws_client.dynamodb.delete_table(TableName=table)
1✔
195
        except Exception as e:
1✔
196
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
197

198

199
@pytest.fixture
1✔
200
def dynamodb_create_table(dynamodb_wait_for_table_active, aws_client):
1✔
201
    # beware, this swallows exception in create_dynamodb_table utility function
202
    tables = []
1✔
203

204
    def factory(**kwargs):
1✔
205
        kwargs["client"] = aws_client.dynamodb
1✔
206
        if "table_name" not in kwargs:
1✔
207
            kwargs["table_name"] = f"test-table-{short_uid()}"
1✔
208
        if "partition_key" not in kwargs:
1✔
209
            kwargs["partition_key"] = "id"
1✔
210

211
        tables.append(kwargs["table_name"])
1✔
212

213
        return create_dynamodb_table(**kwargs)
1✔
214

215
    yield factory
1✔
216

217
    # cleanup
218
    for table in tables:
1✔
219
        try:
1✔
220
            # table has to be in ACTIVE state before deletion
221
            dynamodb_wait_for_table_active(table)
1✔
222
            aws_client.dynamodb.delete_table(TableName=table)
1✔
223
        except Exception as e:
1✔
224
            LOG.debug("error cleaning up table %s: %s", table, e)
1✔
225

226

227
@pytest.fixture
1✔
228
def s3_create_bucket(s3_empty_bucket, aws_client):
1✔
229
    buckets = []
1✔
230

231
    def factory(**kwargs) -> str:
1✔
232
        if "Bucket" not in kwargs:
1✔
233
            kwargs["Bucket"] = "test-bucket-%s" % short_uid()
1✔
234

235
        if (
1✔
236
            "CreateBucketConfiguration" not in kwargs
237
            and aws_client.s3.meta.region_name != "us-east-1"
238
        ):
UNCOV
239
            kwargs["CreateBucketConfiguration"] = {
×
240
                "LocationConstraint": aws_client.s3.meta.region_name
241
            }
242

243
        aws_client.s3.create_bucket(**kwargs)
1✔
244
        buckets.append(kwargs["Bucket"])
1✔
245
        return kwargs["Bucket"]
1✔
246

247
    yield factory
1✔
248

249
    # cleanup
250
    for bucket in buckets:
1✔
251
        try:
1✔
252
            s3_empty_bucket(bucket)
1✔
253
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
254
        except Exception as e:
1✔
255
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
1✔
256

257

258
@pytest.fixture
1✔
259
def s3_create_bucket_with_client(s3_empty_bucket, aws_client):
1✔
260
    buckets = []
1✔
261

262
    def factory(s3_client, **kwargs) -> str:
1✔
263
        if "Bucket" not in kwargs:
1✔
264
            kwargs["Bucket"] = f"test-bucket-{short_uid()}"
×
265

266
        response = s3_client.create_bucket(**kwargs)
1✔
267
        buckets.append(kwargs["Bucket"])
1✔
268
        return response
1✔
269

270
    yield factory
1✔
271

272
    # cleanup
273
    for bucket in buckets:
1✔
274
        try:
1✔
275
            s3_empty_bucket(bucket)
1✔
276
            aws_client.s3.delete_bucket(Bucket=bucket)
1✔
277
        except Exception as e:
×
278
            LOG.debug("error cleaning up bucket %s: %s", bucket, e)
×
279

280

281
@pytest.fixture
1✔
282
def s3_bucket(s3_create_bucket, aws_client) -> str:
1✔
283
    region = aws_client.s3.meta.region_name
1✔
284
    kwargs = {}
1✔
285
    if region != "us-east-1":
1✔
UNCOV
286
        kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
×
287
    return s3_create_bucket(**kwargs)
1✔
288

289

290
@pytest.fixture
1✔
291
def s3_empty_bucket(aws_client):
1✔
292
    """
293
    Returns a factory that given a bucket name, deletes all objects and deletes all object versions
294
    """
295

296
    # Boto resource would make this a straightforward task, but our internal client does not support Boto resource
297
    # FIXME: this won't work when bucket has more than 1000 objects
298
    def factory(bucket_name: str):
1✔
299
        kwargs = {}
1✔
300
        try:
1✔
301
            aws_client.s3.get_object_lock_configuration(Bucket=bucket_name)
1✔
302
            kwargs["BypassGovernanceRetention"] = True
1✔
303
        except ClientError:
1✔
304
            pass
1✔
305

306
        response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
1✔
307
        objects = [{"Key": obj["Key"]} for obj in response.get("Contents", [])]
1✔
308
        if objects:
1✔
309
            aws_client.s3.delete_objects(
1✔
310
                Bucket=bucket_name,
311
                Delete={"Objects": objects},
312
                **kwargs,
313
            )
314

315
        response = aws_client.s3.list_object_versions(Bucket=bucket_name)
1✔
316
        versions = response.get("Versions", [])
1✔
317
        versions.extend(response.get("DeleteMarkers", []))
1✔
318

319
        object_versions = [{"Key": obj["Key"], "VersionId": obj["VersionId"]} for obj in versions]
1✔
320
        if object_versions:
1✔
321
            aws_client.s3.delete_objects(
1✔
322
                Bucket=bucket_name,
323
                Delete={"Objects": object_versions},
324
                **kwargs,
325
            )
326

327
    yield factory
1✔
328

329

330
@pytest.fixture
1✔
331
def sqs_create_queue(aws_client):
1✔
332
    queue_urls = []
1✔
333

334
    def factory(**kwargs):
1✔
335
        if "QueueName" not in kwargs:
1✔
336
            kwargs["QueueName"] = "test-queue-%s" % short_uid()
1✔
337

338
        response = aws_client.sqs.create_queue(**kwargs)
1✔
339
        url = response["QueueUrl"]
1✔
340
        queue_urls.append(url)
1✔
341

342
        return url
1✔
343

344
    yield factory
1✔
345

346
    # cleanup
347
    for queue_url in queue_urls:
1✔
348
        try:
1✔
349
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
350
        except Exception as e:
1✔
351
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
1✔
352

353

354
@pytest.fixture
1✔
355
def sqs_receive_messages_delete(aws_client):
1✔
356
    def factory(
1✔
357
        queue_url: str,
358
        expected_messages: Optional[int] = None,
359
        wait_time: Optional[int] = 5,
360
    ):
361
        response = aws_client.sqs.receive_message(
1✔
362
            QueueUrl=queue_url,
363
            MessageAttributeNames=["All"],
364
            VisibilityTimeout=0,
365
            WaitTimeSeconds=wait_time,
366
        )
367
        messages = []
1✔
368
        for m in response["Messages"]:
1✔
369
            message = json.loads(to_str(m["Body"]))
1✔
370
            messages.append(message)
1✔
371

372
        if expected_messages is not None:
1✔
373
            assert len(messages) == expected_messages
×
374

375
        for message in response["Messages"]:
1✔
376
            aws_client.sqs.delete_message(
1✔
377
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
378
            )
379

380
        return messages
1✔
381

382
    return factory
1✔
383

384

385
@pytest.fixture
1✔
386
def sqs_receive_num_messages(sqs_receive_messages_delete):
1✔
387
    def factory(queue_url: str, expected_messages: int, max_iterations: int = 3):
1✔
388
        all_messages = []
1✔
389
        for _ in range(max_iterations):
1✔
390
            try:
1✔
391
                messages = sqs_receive_messages_delete(queue_url, wait_time=5)
1✔
392
            except KeyError:
×
393
                # there were no messages
394
                continue
×
395
            all_messages.extend(messages)
1✔
396

397
            if len(all_messages) >= expected_messages:
1✔
398
                return all_messages[:expected_messages]
1✔
399

400
        raise AssertionError(f"max iterations reached with {len(all_messages)} messages received")
×
401

402
    return factory
1✔
403

404

405
@pytest.fixture
1✔
406
def sqs_collect_messages(aws_client):
1✔
407
    """Collects SQS messages from a given queue_url and deletes them by default.
408
    Example usage:
409
    messages = sqs_collect_messages(
410
         my_queue_url,
411
         expected=2,
412
         timeout=10,
413
         attribute_names=["All"],
414
         message_attribute_names=["All"],
415
    )
416
    """
417

418
    def factory(
1✔
419
        queue_url: str,
420
        expected: int,
421
        timeout: int,
422
        delete: bool = True,
423
        attribute_names: list[str] = None,
424
        message_attribute_names: list[str] = None,
425
        max_number_of_messages: int = 1,
426
        wait_time_seconds: int = 5,
427
        sqs_client: "SQSClient | None" = None,
428
    ) -> list["MessageTypeDef"]:
429
        sqs_client = sqs_client or aws_client.sqs
1✔
430
        collected = []
1✔
431

432
        def _receive():
1✔
433
            response = sqs_client.receive_message(
1✔
434
                QueueUrl=queue_url,
435
                # Maximum is 20 seconds. Performs long polling.
436
                WaitTimeSeconds=wait_time_seconds,
437
                # Maximum 10 messages
438
                MaxNumberOfMessages=max_number_of_messages,
439
                AttributeNames=attribute_names or [],
440
                MessageAttributeNames=message_attribute_names or [],
441
            )
442

443
            if messages := response.get("Messages"):
1✔
444
                collected.extend(messages)
1✔
445

446
                if delete:
1✔
447
                    for m in messages:
1✔
448
                        sqs_client.delete_message(
1✔
449
                            QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]
450
                        )
451

452
            return len(collected) >= expected
1✔
453

454
        if not poll_condition(_receive, timeout=timeout):
1✔
455
            raise TimeoutError(
×
456
                f"gave up waiting for messages (expected={expected}, actual={len(collected)}"
457
            )
458

459
        return collected
1✔
460

461
    yield factory
1✔
462

463

464
@pytest.fixture
1✔
465
def sqs_queue(sqs_create_queue):
1✔
466
    return sqs_create_queue()
1✔
467

468

469
@pytest.fixture
1✔
470
def sqs_get_queue_arn(aws_client) -> Callable:
1✔
471
    def _get_queue_arn(queue_url: str) -> str:
1✔
472
        return aws_client.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
1✔
473
            "Attributes"
474
        ]["QueueArn"]
475

476
    return _get_queue_arn
1✔
477

478

479
@pytest.fixture
1✔
480
def sqs_queue_exists(aws_client):
1✔
481
    def _queue_exists(queue_url: str) -> bool:
1✔
482
        """
483
        Checks whether a queue with the given queue URL exists.
484
        :param queue_url: the queue URL
485
        :return: true if the queue exists, false otherwise
486
        """
487
        try:
1✔
488
            result = aws_client.sqs.get_queue_url(QueueName=queue_url.split("/")[-1])
1✔
489
            return result.get("QueueUrl") == queue_url
×
490
        except ClientError as e:
1✔
491
            if "NonExistentQueue" in e.response["Error"]["Code"]:
1✔
492
                return False
1✔
493
            raise
×
494

495
    yield _queue_exists
1✔
496

497

498
@pytest.fixture
1✔
499
def sns_create_topic(aws_client):
1✔
500
    topic_arns = []
1✔
501

502
    def _create_topic(**kwargs):
1✔
503
        if "Name" not in kwargs:
1✔
504
            kwargs["Name"] = "test-topic-%s" % short_uid()
1✔
505
        response = aws_client.sns.create_topic(**kwargs)
1✔
506
        topic_arns.append(response["TopicArn"])
1✔
507
        return response
1✔
508

509
    yield _create_topic
1✔
510

511
    for topic_arn in topic_arns:
1✔
512
        try:
1✔
513
            aws_client.sns.delete_topic(TopicArn=topic_arn)
1✔
514
        except Exception as e:
×
515
            LOG.debug("error cleaning up topic %s: %s", topic_arn, e)
×
516

517

518
@pytest.fixture
1✔
519
def sns_wait_for_topic_delete(aws_client):
1✔
520
    def wait_for_topic_delete(topic_arn: str) -> None:
1✔
521
        def wait():
1✔
522
            try:
1✔
523
                aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
524
                return False
×
525
            except Exception as e:
1✔
526
                if "NotFound" in e.response["Error"]["Code"]:
1✔
527
                    return True
1✔
528

529
                raise
×
530

531
        poll_condition(wait, timeout=30)
1✔
532

533
    return wait_for_topic_delete
1✔
534

535

536
@pytest.fixture
1✔
537
def sns_subscription(aws_client):
1✔
538
    sub_arns = []
1✔
539

540
    def _create_sub(**kwargs):
1✔
541
        if kwargs.get("ReturnSubscriptionArn") is None:
1✔
542
            kwargs["ReturnSubscriptionArn"] = True
1✔
543

544
        # requires 'TopicArn', 'Protocol', and 'Endpoint'
545
        response = aws_client.sns.subscribe(**kwargs)
1✔
546
        sub_arn = response["SubscriptionArn"]
1✔
547
        sub_arns.append(sub_arn)
1✔
548
        return response
1✔
549

550
    yield _create_sub
1✔
551

552
    for sub_arn in sub_arns:
1✔
553
        try:
1✔
554
            aws_client.sns.unsubscribe(SubscriptionArn=sub_arn)
1✔
555
        except Exception as e:
1✔
556
            LOG.debug("error cleaning up subscription %s: %s", sub_arn, e)
1✔
557

558

559
@pytest.fixture
1✔
560
def sns_topic(sns_create_topic, aws_client):
1✔
561
    topic_arn = sns_create_topic()["TopicArn"]
1✔
562
    return aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
1✔
563

564

565
@pytest.fixture
1✔
566
def sns_allow_topic_sqs_queue(aws_client):
1✔
567
    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:
1✔
568
        # allow topic to write to sqs queue
569
        aws_client.sqs.set_queue_attributes(
1✔
570
            QueueUrl=sqs_queue_url,
571
            Attributes={
572
                "Policy": json.dumps(
573
                    {
574
                        "Statement": [
575
                            {
576
                                "Effect": "Allow",
577
                                "Principal": {"Service": "sns.amazonaws.com"},
578
                                "Action": "sqs:SendMessage",
579
                                "Resource": sqs_queue_arn,
580
                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},
581
                            }
582
                        ]
583
                    }
584
                )
585
            },
586
        )
587

588
    return _allow_sns_topic
1✔
589

590

591
@pytest.fixture
1✔
592
def sns_create_sqs_subscription(sns_allow_topic_sqs_queue, sqs_get_queue_arn, aws_client):
1✔
593
    subscriptions = []
1✔
594

595
    def _factory(topic_arn: str, queue_url: str, **kwargs) -> Dict[str, str]:
1✔
596
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
597

598
        # connect sns topic to sqs
599
        subscription = aws_client.sns.subscribe(
1✔
600
            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, **kwargs
601
        )
602
        subscription_arn = subscription["SubscriptionArn"]
1✔
603

604
        # allow topic to write to sqs queue
605
        sns_allow_topic_sqs_queue(
1✔
606
            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn
607
        )
608

609
        subscriptions.append(subscription_arn)
1✔
610
        return aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)[
1✔
611
            "Attributes"
612
        ]
613

614
    yield _factory
1✔
615

616
    for arn in subscriptions:
1✔
617
        try:
1✔
618
            aws_client.sns.unsubscribe(SubscriptionArn=arn)
1✔
619
        except Exception as e:
×
620
            LOG.error("error cleaning up subscription %s: %s", arn, e)
×
621

622

623
@pytest.fixture
1✔
624
def sns_create_http_endpoint(sns_create_topic, sns_subscription, aws_client):
1✔
625
    # This fixture can be used with manual setup to expose the HTTPServer fixture to AWS. One example is to use a
626
    # a service like localhost.run, and set up a specific port to start the `HTTPServer(port=40000)` for example,
627
    # and tunnel `localhost:40000` to a specific domain that you can manually return from this fixture.
628
    http_servers = []
1✔
629

630
    def _create_http_endpoint(
1✔
631
        raw_message_delivery: bool = False,
632
    ) -> Tuple[str, str, str, HTTPServer]:
633
        server = HTTPServer()
1✔
634
        server.start()
1✔
635
        http_servers.append(server)
1✔
636
        server.expect_request("/sns-endpoint").respond_with_data(status=200)
1✔
637
        endpoint_url = server.url_for("/sns-endpoint")
1✔
638
        wait_for_port_open(endpoint_url)
1✔
639

640
        topic_arn = sns_create_topic()["TopicArn"]
1✔
641
        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)
1✔
642
        subscription_arn = subscription["SubscriptionArn"]
1✔
643
        delivery_policy = {
1✔
644
            "healthyRetryPolicy": {
645
                "minDelayTarget": 1,
646
                "maxDelayTarget": 1,
647
                "numRetries": 0,
648
                "numNoDelayRetries": 0,
649
                "numMinDelayRetries": 0,
650
                "numMaxDelayRetries": 0,
651
                "backoffFunction": "linear",
652
            },
653
            "sicklyRetryPolicy": None,
654
            "throttlePolicy": {"maxReceivesPerSecond": 1000},
655
            "guaranteed": False,
656
        }
657
        aws_client.sns.set_subscription_attributes(
1✔
658
            SubscriptionArn=subscription_arn,
659
            AttributeName="DeliveryPolicy",
660
            AttributeValue=json.dumps(delivery_policy),
661
        )
662

663
        if raw_message_delivery:
1✔
664
            aws_client.sns.set_subscription_attributes(
1✔
665
                SubscriptionArn=subscription_arn,
666
                AttributeName="RawMessageDelivery",
667
                AttributeValue="true",
668
            )
669

670
        return topic_arn, subscription_arn, endpoint_url, server
1✔
671

672
    yield _create_http_endpoint
1✔
673

674
    for http_server in http_servers:
1✔
675
        if http_server.is_running():
1✔
676
            http_server.stop()
1✔
677

678

679
@pytest.fixture
1✔
680
def route53_hosted_zone(aws_client):
1✔
681
    hosted_zones = []
1✔
682

683
    def factory(**kwargs):
1✔
684
        if "Name" not in kwargs:
1✔
685
            kwargs["Name"] = f"www.{short_uid()}.com."
1✔
686
        if "CallerReference" not in kwargs:
1✔
687
            kwargs["CallerReference"] = f"caller-ref-{short_uid()}"
1✔
688
        response = aws_client.route53.create_hosted_zone(
1✔
689
            Name=kwargs["Name"], CallerReference=kwargs["CallerReference"]
690
        )
691
        hosted_zones.append(response["HostedZone"]["Id"])
1✔
692
        return response
1✔
693

694
    yield factory
1✔
695

696
    for zone in hosted_zones:
1✔
697
        try:
1✔
698
            aws_client.route53.delete_hosted_zone(Id=zone)
1✔
699
        except Exception as e:
1✔
700
            LOG.debug("error cleaning up route53 HostedZone %s: %s", zone, e)
1✔
701

702

703
@pytest.fixture
1✔
704
def transcribe_create_job(s3_bucket, aws_client):
1✔
705
    job_names = []
1✔
706

707
    def _create_job(audio_file: str, params: Optional[dict[str, Any]] = None) -> str:
1✔
708
        s3_key = "test-clip.wav"
1✔
709

710
        if not params:
1✔
711
            params = {}
1✔
712

713
        if "TranscriptionJobName" not in params:
1✔
714
            params["TranscriptionJobName"] = f"test-transcribe-{short_uid()}"
1✔
715

716
        if "LanguageCode" not in params:
1✔
717
            params["LanguageCode"] = "en-GB"
1✔
718

719
        if "Media" not in params:
1✔
720
            params["Media"] = {"MediaFileUri": f"s3://{s3_bucket}/{s3_key}"}
1✔
721

722
        # upload test wav to a s3 bucket
723
        with open(audio_file, "rb") as f:
1✔
724
            aws_client.s3.upload_fileobj(f, s3_bucket, s3_key)
1✔
725

726
        response = aws_client.transcribe.start_transcription_job(**params)
1✔
727

728
        job_name = response["TranscriptionJob"]["TranscriptionJobName"]
1✔
729
        job_names.append(job_name)
1✔
730

731
        return job_name
1✔
732

733
    yield _create_job
1✔
734

735
    for job_name in job_names:
1✔
736
        with contextlib.suppress(ClientError):
1✔
737
            aws_client.transcribe.delete_transcription_job(TranscriptionJobName=job_name)
1✔
738

739

740
@pytest.fixture
1✔
741
def kinesis_create_stream(aws_client):
1✔
742
    stream_names = []
1✔
743

744
    def _create_stream(**kwargs):
1✔
745
        if "StreamName" not in kwargs:
1✔
746
            kwargs["StreamName"] = f"test-stream-{short_uid()}"
1✔
747
        aws_client.kinesis.create_stream(**kwargs)
1✔
748
        stream_names.append(kwargs["StreamName"])
1✔
749
        return kwargs["StreamName"]
1✔
750

751
    yield _create_stream
1✔
752

753
    for stream_name in stream_names:
1✔
754
        try:
1✔
755
            aws_client.kinesis.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
1✔
756
        except Exception as e:
×
757
            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)
×
758

759

760
@pytest.fixture
1✔
761
def wait_for_stream_ready(aws_client):
1✔
762
    def _wait_for_stream_ready(stream_name: str):
1✔
763
        def is_stream_ready():
1✔
764
            describe_stream_response = aws_client.kinesis.describe_stream(StreamName=stream_name)
1✔
765
            return describe_stream_response["StreamDescription"]["StreamStatus"] in [
1✔
766
                "ACTIVE",
767
                "UPDATING",
768
            ]
769

770
        return poll_condition(is_stream_ready)
1✔
771

772
    return _wait_for_stream_ready
1✔
773

774

775
@pytest.fixture
1✔
776
def wait_for_delivery_stream_ready(aws_client):
1✔
777
    def _wait_for_stream_ready(delivery_stream_name: str):
1✔
778
        def is_stream_ready():
1✔
779
            describe_stream_response = aws_client.firehose.describe_delivery_stream(
1✔
780
                DeliveryStreamName=delivery_stream_name
781
            )
782
            return (
1✔
783
                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]
784
                == "ACTIVE"
785
            )
786

787
        poll_condition(is_stream_ready)
1✔
788

789
    return _wait_for_stream_ready
1✔
790

791

792
@pytest.fixture
1✔
793
def wait_for_dynamodb_stream_ready(aws_client):
1✔
794
    def _wait_for_stream_ready(stream_arn: str):
1✔
795
        def is_stream_ready():
1✔
796
            describe_stream_response = aws_client.dynamodbstreams.describe_stream(
1✔
797
                StreamArn=stream_arn
798
            )
799
            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"
1✔
800

801
        return poll_condition(is_stream_ready)
1✔
802

803
    return _wait_for_stream_ready
1✔
804

805

806
@pytest.fixture()
1✔
807
def kms_create_key(aws_client_factory):
1✔
808
    key_ids = []
1✔
809

810
    def _create_key(region_name: str = None, **kwargs):
1✔
811
        if "Description" not in kwargs:
1✔
812
            kwargs["Description"] = f"test description - {short_uid()}"
1✔
813
        key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
1✔
814
            "KeyMetadata"
815
        ]
816
        key_ids.append((region_name, key_metadata["KeyId"]))
1✔
817
        return key_metadata
1✔
818

819
    yield _create_key
1✔
820

821
    for region_name, key_id in key_ids:
1✔
822
        try:
1✔
823
            # shortest amount of time you can schedule the deletion
824
            aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
1✔
825
                KeyId=key_id, PendingWindowInDays=7
826
            )
827
        except Exception as e:
1✔
828
            exception_message = str(e)
1✔
829
            # Some tests schedule their keys for deletion themselves.
830
            if (
1✔
831
                "KMSInvalidStateException" not in exception_message
832
                or "is pending deletion" not in exception_message
833
            ):
834
                LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
835

836

837
@pytest.fixture()
1✔
838
def kms_replicate_key(aws_client_factory):
1✔
839
    key_ids = []
1✔
840

841
    def _replicate_key(region_from=None, **kwargs):
1✔
842
        region_to = kwargs.get("ReplicaRegion")
1✔
843
        key_ids.append((region_to, kwargs.get("KeyId")))
1✔
844
        return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)
1✔
845

846
    yield _replicate_key
1✔
847

848
    for region_to, key_id in key_ids:
1✔
849
        try:
1✔
850
            # shortest amount of time you can schedule the deletion
851
            aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
1✔
852
                KeyId=key_id, PendingWindowInDays=7
853
            )
854
        except Exception as e:
×
855
            LOG.debug("error cleaning up KMS key %s: %s", key_id, e)
×
856

857

858
# kms_create_key fixture is used here not just to be able to create aliases without a key specified,
859
# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -
860
# to make sure that we clean up aliases before keys get cleaned up.
861
@pytest.fixture()
1✔
862
def kms_create_alias(kms_create_key, aws_client):
1✔
863
    aliases = []
1✔
864

865
    def _create_alias(**kwargs):
1✔
866
        if "AliasName" not in kwargs:
1✔
867
            kwargs["AliasName"] = f"alias/{short_uid()}"
1✔
868
        if "TargetKeyId" not in kwargs:
1✔
869
            kwargs["TargetKeyId"] = kms_create_key()["KeyId"]
1✔
870

871
        aws_client.kms.create_alias(**kwargs)
1✔
872
        aliases.append(kwargs["AliasName"])
1✔
873
        return kwargs["AliasName"]
1✔
874

875
    yield _create_alias
1✔
876

877
    for alias in aliases:
1✔
878
        try:
1✔
879
            aws_client.kms.delete_alias(AliasName=alias)
1✔
880
        except Exception as e:
1✔
881
            LOG.debug("error cleaning up KMS alias %s: %s", alias, e)
1✔
882

883

884
@pytest.fixture()
1✔
885
def kms_create_grant(kms_create_key, aws_client):
1✔
886
    grants = []
1✔
887

888
    def _create_grant(**kwargs):
1✔
889
        # Just a random ARN, since KMS in LocalStack currently doesn't validate GranteePrincipal,
890
        # but some GranteePrincipal is required to create a grant.
891
        GRANTEE_PRINCIPAL_ARN = (
1✔
892
            "arn:aws:kms:eu-central-1:123456789876:key/198a5a78-52c3-489f-ac70-b06a4d11027a"
893
        )
894

895
        if "Operations" not in kwargs:
1✔
896
            kwargs["Operations"] = ["Decrypt", "Encrypt"]
1✔
897
        if "GranteePrincipal" not in kwargs:
1✔
898
            kwargs["GranteePrincipal"] = GRANTEE_PRINCIPAL_ARN
1✔
899
        if "KeyId" not in kwargs:
1✔
900
            kwargs["KeyId"] = kms_create_key()["KeyId"]
×
901

902
        grant_id = aws_client.kms.create_grant(**kwargs)["GrantId"]
1✔
903
        grants.append((grant_id, kwargs["KeyId"]))
1✔
904
        return grant_id, kwargs["KeyId"]
1✔
905

906
    yield _create_grant
1✔
907

908
    for grant_id, key_id in grants:
1✔
909
        try:
1✔
910
            aws_client.kms.retire_grant(GrantId=grant_id, KeyId=key_id)
1✔
911
        except Exception as e:
×
912
            LOG.debug("error cleaning up KMS grant %s: %s", grant_id, e)
×
913

914

915
@pytest.fixture
1✔
916
def kms_key(kms_create_key):
1✔
917
    return kms_create_key()
1✔
918

919

920
@pytest.fixture
1✔
921
def kms_grant_and_key(kms_key, aws_client):
1✔
922
    user_arn = aws_client.sts.get_caller_identity()["Arn"]
1✔
923

924
    return [
1✔
925
        aws_client.kms.create_grant(
926
            KeyId=kms_key["KeyId"],
927
            GranteePrincipal=user_arn,
928
            Operations=["Decrypt", "Encrypt"],
929
        ),
930
        kms_key,
931
    ]
932

933

934
@pytest.fixture
1✔
935
def opensearch_wait_for_cluster(aws_client):
1✔
936
    def _wait_for_cluster(domain_name: str):
1✔
937
        def finished_processing():
1✔
938
            status = aws_client.opensearch.describe_domain(DomainName=domain_name)["DomainStatus"]
1✔
939
            return status["Processing"] is False and "Endpoint" in status
1✔
940

941
        assert poll_condition(
1✔
942
            finished_processing, timeout=25 * 60, **({"interval": 10} if is_aws_cloud() else {})
943
        ), f"could not start domain: {domain_name}"
944

945
    return _wait_for_cluster
1✔
946

947

948
@pytest.fixture
1✔
949
def opensearch_create_domain(opensearch_wait_for_cluster, aws_client):
1✔
950
    domains = []
1✔
951

952
    def factory(**kwargs) -> str:
1✔
953
        if "DomainName" not in kwargs:
1✔
954
            kwargs["DomainName"] = f"test-domain-{short_uid()}"
1✔
955

956
        aws_client.opensearch.create_domain(**kwargs)
1✔
957

958
        opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])
1✔
959

960
        domains.append(kwargs["DomainName"])
1✔
961
        return kwargs["DomainName"]
1✔
962

963
    yield factory
1✔
964

965
    # cleanup
966
    for domain in domains:
1✔
967
        try:
1✔
968
            aws_client.opensearch.delete_domain(DomainName=domain)
1✔
969
        except Exception as e:
×
970
            LOG.debug("error cleaning up domain %s: %s", domain, e)
×
971

972

973
@pytest.fixture
1✔
974
def opensearch_domain(opensearch_create_domain) -> str:
1✔
975
    return opensearch_create_domain()
1✔
976

977

978
@pytest.fixture
1✔
979
def opensearch_endpoint(opensearch_domain, aws_client) -> str:
1✔
980
    status = aws_client.opensearch.describe_domain(DomainName=opensearch_domain)["DomainStatus"]
1✔
981
    assert "Endpoint" in status
1✔
982
    return f"https://{status['Endpoint']}"
1✔
983

984

985
@pytest.fixture
1✔
986
def opensearch_document_path(opensearch_endpoint, aws_client):
1✔
987
    document = {
1✔
988
        "first_name": "Boba",
989
        "last_name": "Fett",
990
        "age": 41,
991
        "about": "I'm just a simple man, trying to make my way in the universe.",
992
        "interests": ["mandalorian armor", "tusken culture"],
993
    }
994
    document_path = f"{opensearch_endpoint}/bountyhunters/_doc/1"
1✔
995
    response = requests.put(
1✔
996
        document_path,
997
        data=json.dumps(document),
998
        headers={"content-type": "application/json", "Accept-encoding": "identity"},
999
    )
1000
    assert response.status_code == 201, f"could not create document at: {document_path}"
1✔
1001
    return document_path
1✔
1002

1003

1004
# Cleanup fixtures
1005
@pytest.fixture
1✔
1006
def cleanup_stacks(aws_client):
1✔
1007
    def _cleanup_stacks(stacks: List[str]) -> None:
1✔
1008
        stacks = ensure_list(stacks)
1✔
1009
        for stack in stacks:
1✔
1010
            try:
1✔
1011
                aws_client.cloudformation.delete_stack(StackName=stack)
1✔
1012
                aws_client.cloudformation.get_waiter("stack_delete_complete").wait(StackName=stack)
1✔
1013
            except Exception:
×
1014
                LOG.debug("Failed to cleanup stack '%s'", stack)
×
1015

1016
    return _cleanup_stacks
1✔
1017

1018

1019
@pytest.fixture
1✔
1020
def cleanup_changesets(aws_client):
1✔
1021
    def _cleanup_changesets(changesets: List[str]) -> None:
1✔
1022
        changesets = ensure_list(changesets)
1✔
1023
        for cs in changesets:
1✔
1024
            try:
1✔
1025
                aws_client.cloudformation.delete_change_set(ChangeSetName=cs)
1✔
1026
            except Exception:
1✔
1027
                LOG.debug("Failed to cleanup changeset '%s'", cs)
1✔
1028

1029
    return _cleanup_changesets
1✔
1030

1031

1032
# Helpers for Cfn
1033

1034

1035
# TODO: exports(!)
1036
@dataclasses.dataclass(frozen=True)
1✔
1037
class DeployResult:
1✔
1038
    change_set_id: str
1✔
1039
    stack_id: str
1✔
1040
    stack_name: str
1✔
1041
    change_set_name: str
1✔
1042
    outputs: Dict[str, str]
1✔
1043

1044
    destroy: Callable[[], None]
1✔
1045

1046

1047
class StackDeployError(Exception):
1✔
1048
    def __init__(self, describe_res: dict, events: list[dict]):
1✔
1049
        self.describe_result = describe_res
1✔
1050
        self.events = events
1✔
1051

1052
        encoded_describe_output = json.dumps(self.describe_result, cls=CustomEncoder)
1✔
1053
        if config.CFN_VERBOSE_ERRORS:
1✔
1054
            msg = f"Describe output:\n{encoded_describe_output}\nEvents:\n{self.format_events(events)}"
×
1055
        else:
1056
            msg = f"Describe output:\n{encoded_describe_output}\nFailing resources:\n{self.format_events(events)}"
1✔
1057

1058
        super().__init__(msg)
1✔
1059

1060
    def format_events(self, events: list[dict]) -> str:
1✔
1061
        formatted_events = []
1✔
1062

1063
        chronological_events = sorted(events, key=lambda event: event["Timestamp"])
1✔
1064
        for event in chronological_events:
1✔
1065
            if event["ResourceStatus"].endswith("FAILED") or config.CFN_VERBOSE_ERRORS:
1✔
1066
                formatted_events.append(self.format_event(event))
1✔
1067

1068
        return "\n".join(formatted_events)
1✔
1069

1070
    @staticmethod
1✔
1071
    def format_event(event: dict) -> str:
1✔
1072
        if reason := event.get("ResourceStatusReason"):
1✔
1073
            reason = reason.replace("\n", "; ")
1✔
1074
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']} ({reason})"
1✔
1075
        else:
1076
            return f"- {event['LogicalResourceId']} ({event['ResourceType']}) -> {event['ResourceStatus']}"
×
1077

1078

1079
@pytest.fixture
1✔
1080
def deploy_cfn_template(
1✔
1081
    aws_client: ServiceLevelClientFactory,
1082
):
1083
    state: list[tuple[str, Callable]] = []
1✔
1084

1085
    def _deploy(
1✔
1086
        *,
1087
        is_update: Optional[bool] = False,
1088
        stack_name: Optional[str] = None,
1089
        change_set_name: Optional[str] = None,
1090
        template: Optional[str] = None,
1091
        template_path: Optional[str | os.PathLike] = None,
1092
        template_mapping: Optional[Dict[str, Any]] = None,
1093
        parameters: Optional[Dict[str, str]] = None,
1094
        role_arn: Optional[str] = None,
1095
        max_wait: Optional[int] = None,
1096
        delay_between_polls: Optional[int] = 2,
1097
        custom_aws_client: Optional[ServiceLevelClientFactory] = None,
1098
    ) -> DeployResult:
1099
        if is_update:
1✔
1100
            assert stack_name
1✔
1101
        stack_name = stack_name or f"stack-{short_uid()}"
1✔
1102
        change_set_name = change_set_name or f"change-set-{short_uid()}"
1✔
1103

1104
        if max_wait is None:
1✔
1105
            max_wait = 1800 if is_aws_cloud() else 180
1✔
1106

1107
        if template_path is not None:
1✔
1108
            template = load_template_file(template_path)
1✔
1109
        template_rendered = render_template(template, **(template_mapping or {}))
1✔
1110

1111
        kwargs = dict(
1✔
1112
            StackName=stack_name,
1113
            ChangeSetName=change_set_name,
1114
            TemplateBody=template_rendered,
1115
            Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
1116
            ChangeSetType=("UPDATE" if is_update else "CREATE"),
1117
            Parameters=[
1118
                {
1119
                    "ParameterKey": k,
1120
                    "ParameterValue": v,
1121
                }
1122
                for (k, v) in (parameters or {}).items()
1123
            ],
1124
        )
1125
        if role_arn is not None:
1✔
1126
            kwargs["RoleARN"] = role_arn
×
1127

1128
        cfn_aws_client = custom_aws_client if custom_aws_client is not None else aws_client
1✔
1129

1130
        response = cfn_aws_client.cloudformation.create_change_set(**kwargs)
1✔
1131

1132
        change_set_id = response["Id"]
1✔
1133
        stack_id = response["StackId"]
1✔
1134

1135
        cfn_aws_client.cloudformation.get_waiter(WAITER_CHANGE_SET_CREATE_COMPLETE).wait(
1✔
1136
            ChangeSetName=change_set_id
1137
        )
1138
        cfn_aws_client.cloudformation.execute_change_set(ChangeSetName=change_set_id)
1✔
1139
        stack_waiter = cfn_aws_client.cloudformation.get_waiter(
1✔
1140
            WAITER_STACK_UPDATE_COMPLETE if is_update else WAITER_STACK_CREATE_COMPLETE
1141
        )
1142

1143
        try:
1✔
1144
            stack_waiter.wait(
1✔
1145
                StackName=stack_id,
1146
                WaiterConfig={
1147
                    "Delay": delay_between_polls,
1148
                    "MaxAttempts": max_wait / delay_between_polls,
1149
                },
1150
            )
1151
        except botocore.exceptions.WaiterError as e:
1✔
1152
            raise StackDeployError(
1✔
1153
                cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)["Stacks"][0],
1154
                cfn_aws_client.cloudformation.describe_stack_events(StackName=stack_id)[
1155
                    "StackEvents"
1156
                ],
1157
            ) from e
1158

1159
        describe_stack_res = cfn_aws_client.cloudformation.describe_stacks(StackName=stack_id)[
1✔
1160
            "Stacks"
1161
        ][0]
1162
        outputs = describe_stack_res.get("Outputs", [])
1✔
1163

1164
        mapped_outputs = {o["OutputKey"]: o.get("OutputValue") for o in outputs}
1✔
1165

1166
        def _destroy_stack():
1✔
1167
            cfn_aws_client.cloudformation.delete_stack(StackName=stack_id)
1✔
1168
            cfn_aws_client.cloudformation.get_waiter(WAITER_STACK_DELETE_COMPLETE).wait(
1✔
1169
                StackName=stack_id,
1170
                WaiterConfig={
1171
                    "Delay": delay_between_polls,
1172
                    "MaxAttempts": max_wait / delay_between_polls,
1173
                },
1174
            )
1175

1176
        state.append((stack_id, _destroy_stack))
1✔
1177

1178
        return DeployResult(
1✔
1179
            change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack
1180
        )
1181

1182
    yield _deploy
1✔
1183

1184
    # delete the stacks in the reverse order they were created in case of inter-stack dependencies
1185
    for stack_id, teardown in state[::-1]:
1✔
1186
        try:
1✔
1187
            teardown()
1✔
1188
        except Exception as e:
1✔
1189
            LOG.debug("Failed cleaning up stack stack_id=%s: %s", stack_id, e)
1✔
1190

1191

1192
@pytest.fixture
1✔
1193
def is_change_set_created_and_available(aws_client):
1✔
1194
    def _is_change_set_created_and_available(change_set_id: str):
1✔
1195
        def _inner():
1✔
1196
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
1✔
1197
            return (
1✔
1198
                # TODO: CREATE_FAILED should also not lead to further retries
1199
                change_set.get("Status") == "CREATE_COMPLETE"
1200
                and change_set.get("ExecutionStatus") == "AVAILABLE"
1201
            )
1202

1203
        return _inner
1✔
1204

1205
    return _is_change_set_created_and_available
1✔
1206

1207

1208
@pytest.fixture
1✔
1209
def is_change_set_failed_and_unavailable(aws_client):
1✔
1210
    def _is_change_set_created_and_available(change_set_id: str):
×
1211
        def _inner():
×
1212
            change_set = aws_client.cloudformation.describe_change_set(ChangeSetName=change_set_id)
×
1213
            return (
×
1214
                # TODO: CREATE_FAILED should also not lead to further retries
1215
                change_set.get("Status") == "FAILED"
1216
                and change_set.get("ExecutionStatus") == "UNAVAILABLE"
1217
            )
1218

1219
        return _inner
×
1220

1221
    return _is_change_set_created_and_available
×
1222

1223

1224
@pytest.fixture
1✔
1225
def is_stack_created(aws_client):
1✔
1226
    return _has_stack_status(aws_client.cloudformation, ["CREATE_COMPLETE", "CREATE_FAILED"])
1✔
1227

1228

1229
@pytest.fixture
1✔
1230
def is_stack_updated(aws_client):
1✔
1231
    return _has_stack_status(aws_client.cloudformation, ["UPDATE_COMPLETE", "UPDATE_FAILED"])
×
1232

1233

1234
@pytest.fixture
1✔
1235
def is_stack_deleted(aws_client):
1✔
1236
    return _has_stack_status(aws_client.cloudformation, ["DELETE_COMPLETE"])
×
1237

1238

1239
def _has_stack_status(cfn_client, statuses: List[str]):
1✔
1240
    def _has_status(stack_id: str):
1✔
1241
        def _inner():
1✔
1242
            resp = cfn_client.describe_stacks(StackName=stack_id)
1✔
1243
            s = resp["Stacks"][0]  # since the lookup  uses the id we can only get a single response
1✔
1244
            return s.get("StackStatus") in statuses
1✔
1245

1246
        return _inner
1✔
1247

1248
    return _has_status
1✔
1249

1250

1251
@pytest.fixture
1✔
1252
def is_change_set_finished(aws_client):
1✔
1253
    def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):
1✔
1254
        def _inner():
1✔
1255
            kwargs = {"ChangeSetName": change_set_id}
1✔
1256
            if stack_name:
1✔
1257
                kwargs["StackName"] = stack_name
×
1258

1259
            check_set = aws_client.cloudformation.describe_change_set(**kwargs)
1✔
1260

1261
            if check_set.get("ExecutionStatus") == "EXECUTE_FAILED":
1✔
1262
                LOG.warning("Change set failed")
×
1263
                raise ShortCircuitWaitException()
×
1264

1265
            return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"
1✔
1266

1267
        return _inner
1✔
1268

1269
    return _is_change_set_finished
1✔
1270

1271

1272
@pytest.fixture
1✔
1273
def wait_until_lambda_ready(aws_client):
1✔
1274
    def _wait_until_ready(function_name: str, qualifier: str = None, client=None):
1✔
1275
        client = client or aws_client.lambda_
1✔
1276

1277
        def _is_not_pending():
1✔
1278
            kwargs = {}
1✔
1279
            if qualifier:
1✔
1280
                kwargs["Qualifier"] = qualifier
×
1281
            try:
1✔
1282
                result = (
1✔
1283
                    client.get_function(FunctionName=function_name)["Configuration"]["State"]
1284
                    != "Pending"
1285
                )
1286
                LOG.debug("lambda state result: result=%s", result)
1✔
1287
                return result
1✔
1288
            except Exception as e:
×
1289
                LOG.error(e)
×
1290
                raise
×
1291

1292
        wait_until(_is_not_pending)
1✔
1293

1294
    return _wait_until_ready
1✔
1295

1296

1297
role_assume_policy = """
1✔
1298
{
1299
  "Version": "2012-10-17",
1300
  "Statement": [
1301
    {
1302
      "Effect": "Allow",
1303
      "Principal": {
1304
        "Service": "lambda.amazonaws.com"
1305
      },
1306
      "Action": "sts:AssumeRole"
1307
    }
1308
  ]
1309
}
1310
""".strip()
1311

1312
role_policy = """
1✔
1313
{
1314
    "Version": "2012-10-17",
1315
    "Statement": [
1316
        {
1317
            "Effect": "Allow",
1318
            "Action": [
1319
                "logs:CreateLogGroup",
1320
                "logs:CreateLogStream",
1321
                "logs:PutLogEvents"
1322
            ],
1323
            "Resource": [
1324
                "*"
1325
            ]
1326
        }
1327
    ]
1328
}
1329
""".strip()
1330

1331

1332
@pytest.fixture
1✔
1333
def create_lambda_function_aws(aws_client):
1✔
1334
    lambda_arns = []
1✔
1335

1336
    def _create_lambda_function(**kwargs):
1✔
1337
        def _create_function():
1✔
1338
            resp = aws_client.lambda_.create_function(**kwargs)
1✔
1339
            lambda_arns.append(resp["FunctionArn"])
1✔
1340

1341
            def _is_not_pending():
1✔
1342
                try:
1✔
1343
                    result = (
1✔
1344
                        aws_client.lambda_.get_function(FunctionName=resp["FunctionName"])[
1345
                            "Configuration"
1346
                        ]["State"]
1347
                        != "Pending"
1348
                    )
1349
                    return result
1✔
1350
                except Exception as e:
×
1351
                    LOG.error(e)
×
1352
                    raise
×
1353

1354
            wait_until(_is_not_pending)
1✔
1355
            return resp
1✔
1356

1357
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1358
        # localstack should normally not require the retries and will just continue here
1359
        return retry(_create_function, retries=3, sleep=4)
1✔
1360

1361
    yield _create_lambda_function
1✔
1362

1363
    for arn in lambda_arns:
1✔
1364
        try:
1✔
1365
            aws_client.lambda_.delete_function(FunctionName=arn)
1✔
1366
        except Exception:
×
1367
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
×
1368

1369

1370
@pytest.fixture
1✔
1371
def create_lambda_function(aws_client, wait_until_lambda_ready, lambda_su_role):
1✔
1372
    lambda_arns_and_clients = []
1✔
1373
    log_groups = []
1✔
1374
    lambda_client = aws_client.lambda_
1✔
1375
    logs_client = aws_client.logs
1✔
1376
    s3_client = aws_client.s3
1✔
1377

1378
    def _create_lambda_function(*args, **kwargs):
1✔
1379
        client = kwargs.get("client") or lambda_client
1✔
1380
        kwargs["client"] = client
1✔
1381
        kwargs["s3_client"] = s3_client
1✔
1382
        func_name = kwargs.get("func_name")
1✔
1383
        assert func_name
1✔
1384
        del kwargs["func_name"]
1✔
1385

1386
        if not kwargs.get("role"):
1✔
1387
            kwargs["role"] = lambda_su_role
1✔
1388

1389
        def _create_function():
1✔
1390
            resp = testutil.create_lambda_function(func_name, **kwargs)
1✔
1391
            lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))
1✔
1392
            wait_until_lambda_ready(function_name=func_name, client=client)
1✔
1393
            log_group_name = f"/aws/lambda/{func_name}"
1✔
1394
            log_groups.append(log_group_name)
1✔
1395
            return resp
1✔
1396

1397
        # @AWS, takes about 10s until the role/policy is "active", until then it will fail
1398
        # localstack should normally not require the retries and will just continue here
1399
        return retry(_create_function, retries=3, sleep=4)
1✔
1400

1401
    yield _create_lambda_function
1✔
1402

1403
    for arn, client in lambda_arns_and_clients:
1✔
1404
        try:
1✔
1405
            client.delete_function(FunctionName=arn)
1✔
1406
        except Exception:
1✔
1407
            LOG.debug("Unable to delete function arn=%s in cleanup", arn)
1✔
1408

1409
    for log_group_name in log_groups:
1✔
1410
        try:
1✔
1411
            logs_client.delete_log_group(logGroupName=log_group_name)
1✔
1412
        except Exception:
1✔
1413
            LOG.debug("Unable to delete log group %s in cleanup", log_group_name)
1✔
1414

1415

1416
@pytest.fixture
1✔
1417
def create_echo_http_server(aws_client, create_lambda_function):
1✔
1418
    from localstack.aws.api.lambda_ import Runtime
1✔
1419

1420
    lambda_client = aws_client.lambda_
1✔
1421
    handler_code = textwrap.dedent(
1✔
1422
        """
1423
    import json
1424
    import os
1425

1426

1427
    def make_response(body: dict, status_code: int = 200):
1428
        return {
1429
            "statusCode": status_code,
1430
            "headers": {"Content-Type": "application/json"},
1431
            "body": body,
1432
        }
1433

1434

1435
    def trim_headers(headers):
1436
        if not int(os.getenv("TRIM_X_HEADERS", 0)):
1437
            return headers
1438
        return {
1439
            key: value for key, value in headers.items()
1440
            if not (key.startswith("x-amzn") or key.startswith("x-forwarded-"))
1441
        }
1442

1443

1444
    def handler(event, context):
1445
        print(json.dumps(event))
1446
        response = {
1447
            "args": event.get("queryStringParameters", {}),
1448
            "data": event.get("body", ""),
1449
            "domain": event["requestContext"].get("domainName", ""),
1450
            "headers": trim_headers(event.get("headers", {})),
1451
            "method": event["requestContext"]["http"].get("method", ""),
1452
            "origin": event["requestContext"]["http"].get("sourceIp", ""),
1453
            "path": event["requestContext"]["http"].get("path", ""),
1454
        }
1455
        return make_response(response)"""
1456
    )
1457

1458
    def _create_echo_http_server(trim_x_headers: bool = False) -> str:
1✔
1459
        """Creates a server that will echo any request. Any request will be returned with the
1460
        following format. Any unset values will have those defaults.
1461
        `trim_x_headers` can be set to True to trim some headers that are automatically added by lambda in
1462
        order to create easier Snapshot testing. Default: `False`
1463
        {
1464
          "args": {},
1465
          "headers": {},
1466
          "data": "",
1467
          "method": "",
1468
          "domain": "",
1469
          "origin": "",
1470
          "path": ""
1471
        }"""
1472
        zip_file = testutil.create_lambda_archive(handler_code, get_content=True)
1✔
1473
        func_name = f"echo-http-{short_uid()}"
1✔
1474
        create_lambda_function(
1✔
1475
            func_name=func_name,
1476
            zip_file=zip_file,
1477
            runtime=Runtime.python3_12,
1478
            envvars={"TRIM_X_HEADERS": "1" if trim_x_headers else "0"},
1479
        )
1480
        url_response = lambda_client.create_function_url_config(
1✔
1481
            FunctionName=func_name, AuthType="NONE"
1482
        )
1483
        aws_client.lambda_.add_permission(
1✔
1484
            FunctionName=func_name,
1485
            StatementId="urlPermission",
1486
            Action="lambda:InvokeFunctionUrl",
1487
            Principal="*",
1488
            FunctionUrlAuthType="NONE",
1489
        )
1490
        return url_response["FunctionUrl"]
1✔
1491

1492
    yield _create_echo_http_server
1✔
1493

1494

1495
@pytest.fixture
1✔
1496
def create_event_source_mapping(aws_client):
1✔
1497
    uuids = []
1✔
1498

1499
    def _create_event_source_mapping(*args, **kwargs):
1✔
1500
        response = aws_client.lambda_.create_event_source_mapping(*args, **kwargs)
1✔
1501
        uuids.append(response["UUID"])
1✔
1502
        return response
1✔
1503

1504
    yield _create_event_source_mapping
1✔
1505

1506
    for uuid in uuids:
1✔
1507
        try:
1✔
1508
            aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1✔
1509
        except Exception:
×
1510
            LOG.debug("Unable to delete event source mapping %s in cleanup", uuid)
×
1511

1512

1513
@pytest.fixture
1✔
1514
def check_lambda_logs(aws_client):
1✔
1515
    def _check_logs(func_name: str, expected_lines: List[str] = None) -> List[str]:
1✔
1516
        if not expected_lines:
1✔
1517
            expected_lines = []
×
1518
        log_events = get_lambda_logs(func_name, logs_client=aws_client.logs)
1✔
1519
        log_messages = [e["message"] for e in log_events]
1✔
1520
        for line in expected_lines:
1✔
1521
            if ".*" in line:
1✔
1522
                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]
1✔
1523
                if any(found):
1✔
1524
                    continue
1✔
1525
            assert line in log_messages
×
1526
        return log_messages
1✔
1527

1528
    return _check_logs
1✔
1529

1530

1531
@pytest.fixture
1✔
1532
def create_policy(aws_client):
1✔
1533
    policy_arns = []
1✔
1534

1535
    def _create_policy(*args, iam_client=None, **kwargs):
1✔
1536
        iam_client = iam_client or aws_client.iam
1✔
1537
        if "PolicyName" not in kwargs:
1✔
1538
            kwargs["PolicyName"] = f"policy-{short_uid()}"
×
1539
        response = iam_client.create_policy(*args, **kwargs)
1✔
1540
        policy_arn = response["Policy"]["Arn"]
1✔
1541
        policy_arns.append((policy_arn, iam_client))
1✔
1542
        return response
1✔
1543

1544
    yield _create_policy
1✔
1545

1546
    for policy_arn, iam_client in policy_arns:
1✔
1547
        try:
1✔
1548
            iam_client.delete_policy(PolicyArn=policy_arn)
1✔
1549
        except Exception:
1✔
1550
            LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)
1✔
1551

1552

1553
@pytest.fixture
1✔
1554
def create_user(aws_client):
1✔
1555
    usernames = []
1✔
1556

1557
    def _create_user(**kwargs):
1✔
1558
        if "UserName" not in kwargs:
1✔
1559
            kwargs["UserName"] = f"user-{short_uid()}"
×
1560
        response = aws_client.iam.create_user(**kwargs)
1✔
1561
        usernames.append(response["User"]["UserName"])
1✔
1562
        return response
1✔
1563

1564
    yield _create_user
1✔
1565

1566
    for username in usernames:
1✔
1567
        try:
1✔
1568
            inline_policies = aws_client.iam.list_user_policies(UserName=username)["PolicyNames"]
1✔
1569
        except ClientError as e:
1✔
1570
            LOG.debug(
1✔
1571
                "Cannot list user policies: %s. User %s probably already deleted...", e, username
1572
            )
1573
            continue
1✔
1574

1575
        for inline_policy in inline_policies:
1✔
1576
            try:
1✔
1577
                aws_client.iam.delete_user_policy(UserName=username, PolicyName=inline_policy)
1✔
1578
            except Exception:
×
1579
                LOG.debug(
×
1580
                    "Could not delete user policy '%s' from '%s' during cleanup",
1581
                    inline_policy,
1582
                    username,
1583
                )
1584
        attached_policies = aws_client.iam.list_attached_user_policies(UserName=username)[
1✔
1585
            "AttachedPolicies"
1586
        ]
1587
        for attached_policy in attached_policies:
1✔
1588
            try:
1✔
1589
                aws_client.iam.detach_user_policy(
1✔
1590
                    UserName=username, PolicyArn=attached_policy["PolicyArn"]
1591
                )
1592
            except Exception:
1✔
1593
                LOG.debug(
1✔
1594
                    "Error detaching policy '%s' from user '%s'",
1595
                    attached_policy["PolicyArn"],
1596
                    username,
1597
                )
1598
        access_keys = aws_client.iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
1✔
1599
        for access_key in access_keys:
1✔
1600
            try:
1✔
1601
                aws_client.iam.delete_access_key(
1✔
1602
                    UserName=username, AccessKeyId=access_key["AccessKeyId"]
1603
                )
1604
            except Exception:
×
1605
                LOG.debug(
×
1606
                    "Error deleting access key '%s' from user '%s'",
1607
                    access_key["AccessKeyId"],
1608
                    username,
1609
                )
1610

1611
        try:
1✔
1612
            aws_client.iam.delete_user(UserName=username)
1✔
1613
        except Exception as e:
1✔
1614
            LOG.debug("Error deleting user '%s' during test cleanup: %s", username, e)
1✔
1615

1616

1617
@pytest.fixture
1✔
1618
def wait_and_assume_role(aws_client):
1✔
1619
    def _wait_and_assume_role(role_arn: str, session_name: str = None, **kwargs):
1✔
1620
        if not session_name:
1✔
1621
            session_name = f"session-{short_uid()}"
1✔
1622

1623
        def assume_role():
1✔
1624
            return aws_client.sts.assume_role(
1✔
1625
                RoleArn=role_arn, RoleSessionName=session_name, **kwargs
1626
            )["Credentials"]
1627

1628
        # need to retry a couple of times before we are allowed to assume this role in AWS
1629
        keys = retry(assume_role, sleep=5, retries=4)
1✔
1630
        return keys
1✔
1631

1632
    return _wait_and_assume_role
1✔
1633

1634

1635
@pytest.fixture
1✔
1636
def create_role(aws_client):
1✔
1637
    role_names = []
1✔
1638

1639
    def _create_role(iam_client=None, **kwargs):
1✔
1640
        if not kwargs.get("RoleName"):
1✔
1641
            kwargs["RoleName"] = f"role-{short_uid()}"
×
1642
        iam_client = iam_client or aws_client.iam
1✔
1643
        result = iam_client.create_role(**kwargs)
1✔
1644
        role_names.append((result["Role"]["RoleName"], iam_client))
1✔
1645
        return result
1✔
1646

1647
    yield _create_role
1✔
1648

1649
    for role_name, iam_client in role_names:
1✔
1650
        # detach policies
1651
        try:
1✔
1652
            attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[
1✔
1653
                "AttachedPolicies"
1654
            ]
1655
        except ClientError as e:
1✔
1656
            LOG.debug(
1✔
1657
                "Cannot list attached role policies: %s. Role %s probably already deleted...",
1658
                e,
1659
                role_name,
1660
            )
1661
            continue
1✔
1662
        for attached_policy in attached_policies:
1✔
1663
            try:
1✔
1664
                iam_client.detach_role_policy(
1✔
1665
                    RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]
1666
                )
1667
            except Exception:
×
1668
                LOG.debug(
×
1669
                    "Could not detach role policy '%s' from '%s' during cleanup",
1670
                    attached_policy["PolicyArn"],
1671
                    role_name,
1672
                )
1673
        role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]
1✔
1674
        for role_policy in role_policies:
1✔
1675
            try:
1✔
1676
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)
1✔
1677
            except Exception:
×
1678
                LOG.debug(
×
1679
                    "Could not delete role policy '%s' from '%s' during cleanup",
1680
                    role_policy,
1681
                    role_name,
1682
                )
1683
        try:
1✔
1684
            iam_client.delete_role(RoleName=role_name)
1✔
1685
        except Exception:
×
1686
            LOG.debug("Could not delete role '%s' during cleanup", role_name)
×
1687

1688

1689
@pytest.fixture
1✔
1690
def create_parameter(aws_client):
1✔
1691
    params = []
1✔
1692

1693
    def _create_parameter(**kwargs):
1✔
1694
        params.append(kwargs["Name"])
1✔
1695
        return aws_client.ssm.put_parameter(**kwargs)
1✔
1696

1697
    yield _create_parameter
1✔
1698

1699
    for param in params:
1✔
1700
        aws_client.ssm.delete_parameter(Name=param)
1✔
1701

1702

1703
@pytest.fixture
1✔
1704
def create_secret(aws_client):
1✔
1705
    items = []
1✔
1706

1707
    def _create_parameter(**kwargs):
1✔
1708
        create_response = aws_client.secretsmanager.create_secret(**kwargs)
1✔
1709
        items.append(create_response["ARN"])
1✔
1710
        return create_response
1✔
1711

1712
    yield _create_parameter
1✔
1713

1714
    for item in items:
1✔
1715
        aws_client.secretsmanager.delete_secret(SecretId=item, ForceDeleteWithoutRecovery=True)
1✔
1716

1717

1718
# TODO Figure out how to make cert creation tests pass against AWS.
1719
#
1720
# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure
1721
# our emulation is correct. Unfortunately, with certificate creation there are some issues.
1722
#
1723
# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is
1724
# by adding some CNAME records as requested by ASW in response to a certificate request.
1725
# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.
1726
#
1727
# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files
1728
# created by openssl etc. Not sure if there are other issues after that.
1729
#
1730
# The third option might be having in AWS some certificates created in advance - so they do not require validation
1731
# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to
1732
# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),
1733
# the domain name in the API request has to match the domain name used in certificate creation. Which means that with
1734
# pre-created certificates we would have to use specific domain names instead of random ones.
1735
@pytest.fixture
1✔
1736
def acm_request_certificate(aws_client_factory):
1✔
1737
    certificate_arns = []
1✔
1738

1739
    def factory(**kwargs) -> str:
1✔
1740
        if "DomainName" not in kwargs:
1✔
1741
            kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"
1✔
1742

1743
        region_name = kwargs.pop("region_name", None)
1✔
1744
        acm_client = aws_client_factory(region_name=region_name).acm
1✔
1745

1746
        response = acm_client.request_certificate(**kwargs)
1✔
1747
        created_certificate_arn = response["CertificateArn"]
1✔
1748
        certificate_arns.append((created_certificate_arn, region_name))
1✔
1749
        return response
1✔
1750

1751
    yield factory
1✔
1752

1753
    # cleanup
1754
    for certificate_arn, region_name in certificate_arns:
1✔
1755
        try:
1✔
1756
            acm_client = aws_client_factory(region_name=region_name).acm
1✔
1757
            acm_client.delete_certificate(CertificateArn=certificate_arn)
1✔
1758
        except Exception as e:
×
1759
            LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)
×
1760

1761

1762
role_policy_su = {
1✔
1763
    "Version": "2012-10-17",
1764
    "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],
1765
}
1766

1767

1768
@pytest.fixture(scope="session")
1✔
1769
def lambda_su_role(aws_client):
1✔
1770
    role_name = f"lambda-autogenerated-{short_uid()}"
1✔
1771
    role = aws_client.iam.create_role(
1✔
1772
        RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy
1773
    )["Role"]
1774
    policy_name = f"lambda-autogenerated-{short_uid()}"
1✔
1775
    policy_arn = aws_client.iam.create_policy(
1✔
1776
        PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)
1777
    )["Policy"]["Arn"]
1778
    aws_client.iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
1779

1780
    if is_aws_cloud():  # dirty but necessary
1✔
1781
        time.sleep(10)
×
1782

1783
    yield role["Arn"]
1✔
1784

1785
    run_safe(aws_client.iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))
1✔
1786
    run_safe(aws_client.iam.delete_role(RoleName=role_name))
1✔
1787
    run_safe(aws_client.iam.delete_policy(PolicyArn=policy_arn))
1✔
1788

1789

1790
@pytest.fixture
1✔
1791
def create_iam_role_and_attach_policy(aws_client):
1✔
1792
    """
1793
    Fixture that creates an IAM role with given role definition and predefined policy ARN.
1794

1795
    Use this fixture with AWS managed policies like 'AmazonS3ReadOnlyAccess' or 'AmazonKinesisFullAccess'.
1796
    """
1797
    roles = []
×
1798

1799
    def _inner(**kwargs: dict[str, any]) -> str:
×
1800
        """
1801
        :param dict RoleDefinition: role definition document
1802
        :param str PolicyArn: policy ARN
1803
        :param str RoleName: role name (autogenerated if omitted)
1804
        :return: role ARN
1805
        """
1806
        if "RoleName" not in kwargs:
×
1807
            kwargs["RoleName"] = f"test-role-{short_uid()}"
×
1808

1809
        role = kwargs["RoleName"]
×
1810
        role_policy = json.dumps(kwargs["RoleDefinition"])
×
1811

1812
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
×
1813
        role_arn = result["Role"]["Arn"]
×
1814

1815
        policy_arn = kwargs["PolicyArn"]
×
1816
        aws_client.iam.attach_role_policy(PolicyArn=policy_arn, RoleName=role)
×
1817

1818
        roles.append(role)
×
1819
        return role_arn
×
1820

1821
    yield _inner
×
1822

1823
    for role in roles:
×
1824
        try:
×
1825
            aws_client.iam.delete_role(RoleName=role)
×
1826
        except Exception as exc:
×
1827
            LOG.debug("Error deleting IAM role '%s': %s", role, exc)
×
1828

1829

1830
@pytest.fixture
1✔
1831
def create_iam_role_with_policy(aws_client):
1✔
1832
    """
1833
    Fixture that creates an IAM role with given role definition and policy definition.
1834
    """
1835
    roles = {}
1✔
1836

1837
    def _create_role_and_policy(**kwargs: dict[str, any]) -> str:
1✔
1838
        """
1839
        :param dict RoleDefinition: role definition document
1840
        :param dict PolicyDefinition: policy definition document
1841
        :param str PolicyName: policy name (autogenerated if omitted)
1842
        :param str RoleName: role name (autogenerated if omitted)
1843
        :return: role ARN
1844
        """
1845
        if "RoleName" not in kwargs:
1✔
1846
            kwargs["RoleName"] = f"test-role-{short_uid()}"
1✔
1847
        role = kwargs["RoleName"]
1✔
1848
        if "PolicyName" not in kwargs:
1✔
1849
            kwargs["PolicyName"] = f"test-policy-{short_uid()}"
1✔
1850
        policy = kwargs["PolicyName"]
1✔
1851
        role_policy = json.dumps(kwargs["RoleDefinition"])
1✔
1852

1853
        result = aws_client.iam.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)
1✔
1854
        role_arn = result["Role"]["Arn"]
1✔
1855

1856
        policy_document = json.dumps(kwargs["PolicyDefinition"])
1✔
1857
        aws_client.iam.put_role_policy(
1✔
1858
            RoleName=role, PolicyName=policy, PolicyDocument=policy_document
1859
        )
1860
        roles[role] = policy
1✔
1861
        return role_arn
1✔
1862

1863
    yield _create_role_and_policy
1✔
1864

1865
    for role_name, policy_name in roles.items():
1✔
1866
        try:
1✔
1867
            aws_client.iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
1✔
1868
        except Exception as exc:
×
1869
            LOG.debug("Error deleting IAM role policy '%s' '%s': %s", role_name, policy_name, exc)
×
1870
        try:
1✔
1871
            aws_client.iam.delete_role(RoleName=role_name)
1✔
1872
        except Exception as exc:
×
1873
            LOG.debug("Error deleting IAM role '%s': %s", role_name, exc)
×
1874

1875

1876
@pytest.fixture
1✔
1877
def firehose_create_delivery_stream(wait_for_delivery_stream_ready, aws_client):
1✔
1878
    delivery_stream_names = []
1✔
1879

1880
    def _create_delivery_stream(**kwargs):
1✔
1881
        if "DeliveryStreamName" not in kwargs:
1✔
1882
            kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"
×
1883
        delivery_stream_name = kwargs["DeliveryStreamName"]
1✔
1884
        response = aws_client.firehose.create_delivery_stream(**kwargs)
1✔
1885
        delivery_stream_names.append(delivery_stream_name)
1✔
1886
        wait_for_delivery_stream_ready(delivery_stream_name)
1✔
1887
        return response
1✔
1888

1889
    yield _create_delivery_stream
1✔
1890

1891
    for delivery_stream_name in delivery_stream_names:
1✔
1892
        try:
1✔
1893
            aws_client.firehose.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
1✔
1894
        except Exception:
×
1895
            LOG.info("Failed to delete delivery stream %s", delivery_stream_name)
×
1896

1897

1898
@pytest.fixture
1✔
1899
def ses_configuration_set(aws_client):
1✔
1900
    configuration_set_names = []
1✔
1901

1902
    def factory(name: str) -> None:
1✔
1903
        aws_client.ses.create_configuration_set(
1✔
1904
            ConfigurationSet={
1905
                "Name": name,
1906
            },
1907
        )
1908
        configuration_set_names.append(name)
1✔
1909

1910
    yield factory
1✔
1911

1912
    for configuration_set_name in configuration_set_names:
1✔
1913
        aws_client.ses.delete_configuration_set(ConfigurationSetName=configuration_set_name)
1✔
1914

1915

1916
@pytest.fixture
1✔
1917
def ses_configuration_set_sns_event_destination(aws_client):
1✔
1918
    event_destinations = []
1✔
1919

1920
    def factory(config_set_name: str, event_destination_name: str, topic_arn: str) -> None:
1✔
1921
        aws_client.ses.create_configuration_set_event_destination(
1✔
1922
            ConfigurationSetName=config_set_name,
1923
            EventDestination={
1924
                "Name": event_destination_name,
1925
                "Enabled": True,
1926
                "MatchingEventTypes": ["send", "bounce", "delivery", "open", "click"],
1927
                "SNSDestination": {
1928
                    "TopicARN": topic_arn,
1929
                },
1930
            },
1931
        )
1932
        event_destinations.append((config_set_name, event_destination_name))
1✔
1933

1934
    yield factory
1✔
1935

1936
    for created_config_set_name, created_event_destination_name in event_destinations:
1✔
1937
        aws_client.ses.delete_configuration_set_event_destination(
1✔
1938
            ConfigurationSetName=created_config_set_name,
1939
            EventDestinationName=created_event_destination_name,
1940
        )
1941

1942

1943
@pytest.fixture
1✔
1944
def ses_email_template(aws_client):
1✔
1945
    template_names = []
1✔
1946

1947
    def factory(name: str, contents: str, subject: str = f"Email template {short_uid()}"):
1✔
1948
        aws_client.ses.create_template(
1✔
1949
            Template={
1950
                "TemplateName": name,
1951
                "SubjectPart": subject,
1952
                "TextPart": contents,
1953
            }
1954
        )
1955
        template_names.append(name)
1✔
1956

1957
    yield factory
1✔
1958

1959
    for template_name in template_names:
1✔
1960
        aws_client.ses.delete_template(TemplateName=template_name)
1✔
1961

1962

1963
@pytest.fixture
1✔
1964
def ses_verify_identity(aws_client):
1✔
1965
    identities = []
1✔
1966

1967
    def factory(email_address: str) -> None:
1✔
1968
        aws_client.ses.verify_email_identity(EmailAddress=email_address)
1✔
1969

1970
    yield factory
1✔
1971

1972
    for identity in identities:
1✔
1973
        aws_client.ses.delete_identity(Identity=identity)
×
1974

1975

1976
@pytest.fixture
1✔
1977
def setup_sender_email_address(ses_verify_identity):
1✔
1978
    """
1979
    If the test is running against AWS then assume the email address passed is already
1980
    verified, and passes the given email address through. Otherwise, it generates one random
1981
    email address and verify them.
1982
    """
1983

1984
    def inner(sender_email_address: Optional[str] = None) -> str:
1✔
1985
        if is_aws_cloud():
1✔
1986
            if sender_email_address is None:
×
1987
                raise ValueError(
×
1988
                    "sender_email_address must be specified to run this test against AWS"
1989
                )
1990
        else:
1991
            # overwrite the given parameters with localstack specific ones
1992
            sender_email_address = f"sender-{short_uid()}@example.com"
1✔
1993
            ses_verify_identity(sender_email_address)
1✔
1994

1995
        return sender_email_address
1✔
1996

1997
    return inner
1✔
1998

1999

2000
@pytest.fixture
1✔
2001
def ec2_create_security_group(aws_client):
1✔
2002
    ec2_sgs = []
1✔
2003

2004
    def factory(ports=None, **kwargs):
1✔
2005
        if "GroupName" not in kwargs:
1✔
2006
            kwargs["GroupName"] = f"test-sg-{short_uid()}"
1✔
2007
        security_group = aws_client.ec2.create_security_group(**kwargs)
1✔
2008

2009
        permissions = [
1✔
2010
            {
2011
                "FromPort": port,
2012
                "IpProtocol": "tcp",
2013
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
2014
                "ToPort": port,
2015
            }
2016
            for port in ports or []
2017
        ]
2018
        aws_client.ec2.authorize_security_group_ingress(
1✔
2019
            GroupName=kwargs["GroupName"],
2020
            IpPermissions=permissions,
2021
        )
2022

2023
        ec2_sgs.append(security_group["GroupId"])
1✔
2024
        return security_group
1✔
2025

2026
    yield factory
1✔
2027

2028
    for sg_group_id in ec2_sgs:
1✔
2029
        try:
1✔
2030
            aws_client.ec2.delete_security_group(GroupId=sg_group_id)
1✔
2031
        except Exception as e:
×
2032
            LOG.debug("Error cleaning up EC2 security group: %s, %s", sg_group_id, e)
×
2033

2034

2035
@pytest.fixture
1✔
2036
def cleanups():
1✔
2037
    cleanup_fns = []
1✔
2038

2039
    yield cleanup_fns
1✔
2040

2041
    for cleanup_callback in cleanup_fns[::-1]:
1✔
2042
        try:
1✔
2043
            cleanup_callback()
1✔
2044
        except Exception as e:
1✔
2045
            LOG.warning("Failed to execute cleanup", exc_info=e)
1✔
2046

2047

2048
@pytest.fixture(scope="session")
1✔
2049
def account_id(aws_client):
1✔
2050
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2051
        return aws_client.sts.get_caller_identity()["Account"]
1✔
2052
    else:
2053
        return TEST_AWS_ACCOUNT_ID
×
2054

2055

2056
@pytest.fixture(scope="session")
1✔
2057
def region_name(aws_client):
1✔
2058
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2059
        return aws_client.sts.meta.region_name
1✔
2060
    else:
2061
        return TEST_AWS_REGION_NAME
×
2062

2063

2064
@pytest.fixture(scope="session")
1✔
2065
def partition(region_name):
1✔
2066
    return get_partition(region_name)
1✔
2067

2068

2069
@pytest.fixture(scope="session")
1✔
2070
def secondary_account_id(secondary_aws_client):
1✔
2071
    if is_aws_cloud() or is_api_enabled("sts"):
1✔
2072
        return secondary_aws_client.sts.get_caller_identity()["Account"]
1✔
2073
    else:
2074
        return SECONDARY_TEST_AWS_ACCOUNT_ID
×
2075

2076

2077
@pytest.fixture(scope="session")
1✔
2078
def secondary_region_name():
1✔
2079
    return SECONDARY_TEST_AWS_REGION_NAME
1✔
2080

2081

2082
@pytest.hookimpl
1✔
2083
def pytest_collection_modifyitems(config: Config, items: list[Item]):
1✔
2084
    only_localstack = pytest.mark.skipif(
1✔
2085
        is_aws_cloud(),
2086
        reason="test only applicable if run against localstack",
2087
    )
2088
    for item in items:
1✔
2089
        for mark in item.iter_markers():
1✔
2090
            if mark.name.endswith("only_localstack"):
1✔
2091
                item.add_marker(only_localstack)
1✔
2092
        if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
1✔
2093
            # add a marker that indicates that this test is snapshot validated
2094
            # if it uses the snapshot fixture -> allows selecting only snapshot
2095
            # validated tests in order to capture new snapshots for a whole
2096
            # test file with "-m snapshot_validated"
2097
            item.add_marker("snapshot_validated")
1✔
2098

2099

2100
@pytest.fixture
1✔
2101
def sample_stores() -> AccountRegionBundle:
1✔
2102
    class SampleStore(BaseStore):
1✔
2103
        CROSS_ACCOUNT_ATTR = CrossAccountAttribute(default=list)
1✔
2104
        CROSS_REGION_ATTR = CrossRegionAttribute(default=list)
1✔
2105
        region_specific_attr = LocalAttribute(default=list)
1✔
2106

2107
    return AccountRegionBundle("zzz", SampleStore, validate=False)
1✔
2108

2109

2110
@pytest.fixture
1✔
2111
def create_rest_apigw(aws_client_factory):
1✔
2112
    rest_apis = []
1✔
2113
    retry_boto_config = None
1✔
2114
    if is_aws_cloud():
1✔
2115
        retry_boto_config = botocore.config.Config(
×
2116
            # Api gateway can throttle requests pretty heavily. Leading to potentially undeleted apis
2117
            retries={"max_attempts": 10, "mode": "adaptive"}
2118
        )
2119

2120
    def _create_apigateway_function(**kwargs):
1✔
2121
        client_region_name = kwargs.pop("region_name", None)
1✔
2122
        apigateway_client = aws_client_factory(
1✔
2123
            region_name=client_region_name, config=retry_boto_config
2124
        ).apigateway
2125
        kwargs.setdefault("name", f"api-{short_uid()}")
1✔
2126

2127
        response = apigateway_client.create_rest_api(**kwargs)
1✔
2128
        api_id = response.get("id")
1✔
2129
        rest_apis.append((api_id, client_region_name))
1✔
2130

2131
        return api_id, response.get("name"), response.get("rootResourceId")
1✔
2132

2133
    yield _create_apigateway_function
1✔
2134

2135
    for rest_api_id, _client_region_name in rest_apis:
1✔
2136
        apigateway_client = aws_client_factory(
1✔
2137
            region_name=_client_region_name,
2138
            config=retry_boto_config,
2139
        ).apigateway
2140
        # First, retrieve the usage plans associated with the REST API
2141
        usage_plan_ids = []
1✔
2142
        usage_plans = apigateway_client.get_usage_plans()
1✔
2143
        for item in usage_plans.get("items", []):
1✔
2144
            api_stages = item.get("apiStages", [])
1✔
2145
            usage_plan_ids.extend(
1✔
2146
                item.get("id") for api_stage in api_stages if api_stage.get("apiId") == rest_api_id
2147
            )
2148

2149
        # Then delete the API, as you can't delete the UsagePlan if a stage is associated with it
2150
        with contextlib.suppress(Exception):
1✔
2151
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
1✔
2152

2153
        # finally delete the usage plans and the API Keys linked to it
2154
        for usage_plan_id in usage_plan_ids:
1✔
2155
            usage_plan_keys = apigateway_client.get_usage_plan_keys(usagePlanId=usage_plan_id)
1✔
2156
            for key in usage_plan_keys.get("items", []):
1✔
2157
                apigateway_client.delete_api_key(apiKey=key["id"])
×
2158
            apigateway_client.delete_usage_plan(usagePlanId=usage_plan_id)
1✔
2159

2160

2161
@pytest.fixture
1✔
2162
def create_rest_apigw_openapi(aws_client_factory):
1✔
2163
    rest_apis = []
×
2164

2165
    def _create_apigateway_function(**kwargs):
×
2166
        region_name = kwargs.pop("region_name", None)
×
2167
        apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2168

2169
        response = apigateway_client.import_rest_api(**kwargs)
×
2170
        api_id = response.get("id")
×
2171
        rest_apis.append((api_id, region_name))
×
2172
        return api_id, response
×
2173

2174
    yield _create_apigateway_function
×
2175

2176
    for rest_api_id, region_name in rest_apis:
×
2177
        with contextlib.suppress(Exception):
×
2178
            apigateway_client = aws_client_factory(region_name=region_name).apigateway
×
2179
            apigateway_client.delete_rest_api(restApiId=rest_api_id)
×
2180

2181

2182
@pytest.fixture
1✔
2183
def appsync_create_api(aws_client):
1✔
2184
    graphql_apis = []
×
2185

2186
    def factory(**kwargs):
×
2187
        if "name" not in kwargs:
×
2188
            kwargs["name"] = f"graphql-api-testing-name-{short_uid()}"
×
2189
        if not kwargs.get("authenticationType"):
×
2190
            kwargs["authenticationType"] = "API_KEY"
×
2191

2192
        result = aws_client.appsync.create_graphql_api(**kwargs)["graphqlApi"]
×
2193
        graphql_apis.append(result["apiId"])
×
2194
        return result
×
2195

2196
    yield factory
×
2197

2198
    for api in graphql_apis:
×
2199
        try:
×
2200
            aws_client.appsync.delete_graphql_api(apiId=api)
×
2201
        except Exception as e:
×
2202
            LOG.debug("Error cleaning up AppSync API: %s, %s", api, e)
×
2203

2204

2205
@pytest.fixture
1✔
2206
def assert_host_customisation(monkeypatch):
1✔
2207
    localstack_host = "foo.bar"
1✔
2208
    monkeypatch.setattr(
1✔
2209
        config, "LOCALSTACK_HOST", config.HostAndPort(host=localstack_host, port=8888)
2210
    )
2211

2212
    def asserter(
1✔
2213
        url: str,
2214
        *,
2215
        custom_host: Optional[str] = None,
2216
    ):
2217
        if custom_host is not None:
1✔
2218
            assert custom_host in url, f"Could not find `{custom_host}` in `{url}`"
×
2219

2220
            assert localstack_host not in url
×
2221
        else:
2222
            assert localstack_host in url, f"Could not find `{localstack_host}` in `{url}`"
1✔
2223

2224
    yield asserter
1✔
2225

2226

2227
@pytest.fixture
1✔
2228
def echo_http_server(httpserver: HTTPServer):
1✔
2229
    """Spins up a local HTTP echo server and returns the endpoint URL"""
2230

2231
    def _echo(request: Request) -> Response:
1✔
2232
        request_json = None
1✔
2233
        if request.is_json:
1✔
2234
            with contextlib.suppress(ValueError):
1✔
2235
                request_json = json.loads(request.data)
1✔
2236
        result = {
1✔
2237
            "data": request.data or "{}",
2238
            "headers": dict(request.headers),
2239
            "url": request.url,
2240
            "method": request.method,
2241
            "json": request_json,
2242
        }
2243
        response_body = json.dumps(json_safe(result))
1✔
2244
        return Response(response_body, status=200)
1✔
2245

2246
    httpserver.expect_request("").respond_with_handler(_echo)
1✔
2247
    http_endpoint = httpserver.url_for("/")
1✔
2248

2249
    return http_endpoint
1✔
2250

2251

2252
@pytest.fixture
1✔
2253
def echo_http_server_post(echo_http_server):
1✔
2254
    """
2255
    Returns an HTTP echo server URL for POST requests that work both locally and for parity tests (against real AWS)
2256
    """
2257
    if is_aws_cloud():
1✔
2258
        return f"{PUBLIC_HTTP_ECHO_SERVER_URL}/post"
×
2259

2260
    return f"{echo_http_server}post"
1✔
2261

2262

2263
def create_policy_doc(effect: str, actions: List, resource=None) -> Dict:
1✔
2264
    actions = ensure_list(actions)
1✔
2265
    resource = resource or "*"
1✔
2266
    return {
1✔
2267
        "Version": "2012-10-17",
2268
        "Statement": [
2269
            {
2270
                # TODO statement ids have to be alphanumeric [0-9A-Za-z], write a test for it
2271
                "Sid": f"s{short_uid()}",
2272
                "Effect": effect,
2273
                "Action": actions,
2274
                "Resource": resource,
2275
            }
2276
        ],
2277
    }
2278

2279

2280
@pytest.fixture
1✔
2281
def create_policy_generated_document(create_policy):
1✔
2282
    def _create_policy_with_doc(effect, actions, policy_name=None, resource=None, iam_client=None):
1✔
2283
        policy_name = policy_name or f"p-{short_uid()}"
1✔
2284
        policy = create_policy_doc(effect, actions, resource=resource)
1✔
2285
        response = create_policy(
1✔
2286
            PolicyName=policy_name, PolicyDocument=json.dumps(policy), iam_client=iam_client
2287
        )
2288
        policy_arn = response["Policy"]["Arn"]
1✔
2289
        return policy_arn
1✔
2290

2291
    return _create_policy_with_doc
1✔
2292

2293

2294
@pytest.fixture
1✔
2295
def create_role_with_policy(create_role, create_policy_generated_document, aws_client):
1✔
2296
    def _create_role_with_policy(
1✔
2297
        effect, actions, assume_policy_doc, resource=None, attach=True, iam_client=None
2298
    ):
2299
        iam_client = iam_client or aws_client.iam
1✔
2300

2301
        role_name = f"role-{short_uid()}"
1✔
2302
        result = create_role(
1✔
2303
            RoleName=role_name, AssumeRolePolicyDocument=assume_policy_doc, iam_client=iam_client
2304
        )
2305
        role_arn = result["Role"]["Arn"]
1✔
2306
        policy_name = f"p-{short_uid()}"
1✔
2307

2308
        if attach:
1✔
2309
            # create role and attach role policy
2310
            policy_arn = create_policy_generated_document(
1✔
2311
                effect, actions, policy_name=policy_name, resource=resource, iam_client=iam_client
2312
            )
2313
            iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
1✔
2314
        else:
2315
            # put role policy
2316
            policy_document = create_policy_doc(effect, actions, resource=resource)
1✔
2317
            policy_document = json.dumps(policy_document)
1✔
2318
            iam_client.put_role_policy(
1✔
2319
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy_document
2320
            )
2321

2322
        return role_name, role_arn
1✔
2323

2324
    return _create_role_with_policy
1✔
2325

2326

2327
@pytest.fixture
1✔
2328
def create_user_with_policy(create_policy_generated_document, create_user, aws_client):
1✔
2329
    def _create_user_with_policy(effect, actions, resource=None):
×
2330
        policy_arn = create_policy_generated_document(effect, actions, resource=resource)
×
2331
        username = f"user-{short_uid()}"
×
2332
        create_user(UserName=username)
×
2333
        aws_client.iam.attach_user_policy(UserName=username, PolicyArn=policy_arn)
×
2334
        keys = aws_client.iam.create_access_key(UserName=username)["AccessKey"]
×
2335
        return username, keys
×
2336

2337
    return _create_user_with_policy
×
2338

2339

2340
@pytest.fixture()
1✔
2341
def register_extension(s3_bucket, aws_client):
1✔
2342
    cfn_client = aws_client.cloudformation
×
2343
    extensions_arns = []
×
2344

2345
    def _register(extension_name, extension_type, artifact_path):
×
2346
        bucket = s3_bucket
×
2347
        key = f"artifact-{short_uid()}"
×
2348

2349
        aws_client.s3.upload_file(artifact_path, bucket, key)
×
2350

2351
        register_response = cfn_client.register_type(
×
2352
            Type=extension_type,
2353
            TypeName=extension_name,
2354
            SchemaHandlerPackage=f"s3://{bucket}/{key}",
2355
        )
2356

2357
        registration_token = register_response["RegistrationToken"]
×
2358
        cfn_client.get_waiter("type_registration_complete").wait(
×
2359
            RegistrationToken=registration_token
2360
        )
2361

2362
        describe_response = cfn_client.describe_type_registration(
×
2363
            RegistrationToken=registration_token
2364
        )
2365

2366
        extensions_arns.append(describe_response["TypeArn"])
×
2367
        cfn_client.set_type_default_version(Arn=describe_response["TypeVersionArn"])
×
2368

2369
        return describe_response
×
2370

2371
    yield _register
×
2372

2373
    for arn in extensions_arns:
×
2374
        versions = cfn_client.list_type_versions(Arn=arn)["TypeVersionSummaries"]
×
2375
        for v in versions:
×
2376
            try:
×
2377
                cfn_client.deregister_type(Arn=v["Arn"])
×
2378
            except Exception:
×
2379
                continue
×
2380
        cfn_client.deregister_type(Arn=arn)
×
2381

2382

2383
@pytest.fixture
1✔
2384
def hosted_zone(aws_client):
1✔
2385
    zone_ids = []
1✔
2386

2387
    def factory(**kwargs):
1✔
2388
        if "CallerReference" not in kwargs:
1✔
2389
            kwargs["CallerReference"] = f"ref-{short_uid()}"
1✔
2390
        response = aws_client.route53.create_hosted_zone(**kwargs)
1✔
2391
        zone_id = response["HostedZone"]["Id"]
1✔
2392
        zone_ids.append(zone_id)
1✔
2393
        return response
1✔
2394

2395
    yield factory
1✔
2396

2397
    for zone_id in zone_ids[::-1]:
1✔
2398
        aws_client.route53.delete_hosted_zone(Id=zone_id)
1✔
2399

2400

2401
@pytest.fixture
1✔
2402
def openapi_validate(monkeypatch):
1✔
2403
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_RESPONSE", "true")
1✔
2404
    monkeypatch.setattr(config, "OPENAPI_VALIDATE_REQUEST", "true")
1✔
2405

2406

2407
@pytest.fixture
1✔
2408
def set_resource_custom_id():
1✔
2409
    set_ids = []
1✔
2410

2411
    def _set_custom_id(resource_identifier: ResourceIdentifier, custom_id):
1✔
2412
        localstack_id_manager.set_custom_id(
1✔
2413
            resource_identifier=resource_identifier, custom_id=custom_id
2414
        )
2415
        set_ids.append(resource_identifier)
1✔
2416

2417
    yield _set_custom_id
1✔
2418

2419
    for resource_identifier in set_ids:
1✔
2420
        localstack_id_manager.unset_custom_id(resource_identifier)
1✔
2421

2422

2423
###############################
2424
# Events (EventBridge) fixtures
2425
###############################
2426

2427

2428
@pytest.fixture
1✔
2429
def events_create_event_bus(aws_client, region_name, account_id):
1✔
2430
    event_bus_names = []
1✔
2431

2432
    def _create_event_bus(**kwargs):
1✔
2433
        if "Name" not in kwargs:
1✔
2434
            kwargs["Name"] = f"test-event-bus-{short_uid()}"
×
2435

2436
        response = aws_client.events.create_event_bus(**kwargs)
1✔
2437
        event_bus_names.append(kwargs["Name"])
1✔
2438
        return response
1✔
2439

2440
    yield _create_event_bus
1✔
2441

2442
    for event_bus_name in event_bus_names:
1✔
2443
        try:
1✔
2444
            response = aws_client.events.list_rules(EventBusName=event_bus_name)
1✔
2445
            rules = [rule["Name"] for rule in response["Rules"]]
1✔
2446

2447
            # Delete all rules for the current event bus
2448
            for rule in rules:
1✔
2449
                try:
1✔
2450
                    response = aws_client.events.list_targets_by_rule(
1✔
2451
                        Rule=rule, EventBusName=event_bus_name
2452
                    )
2453
                    targets = [target["Id"] for target in response["Targets"]]
1✔
2454

2455
                    # Remove all targets for the current rule
2456
                    if targets:
1✔
2457
                        for target in targets:
1✔
2458
                            aws_client.events.remove_targets(
1✔
2459
                                Rule=rule, EventBusName=event_bus_name, Ids=[target]
2460
                            )
2461

2462
                    aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2463
                except Exception as e:
×
2464
                    LOG.warning("Failed to delete rule %s: %s", rule, e)
×
2465

2466
            # Delete archives for event bus
2467
            event_source_arn = (
1✔
2468
                f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}"
2469
            )
2470
            response = aws_client.events.list_archives(EventSourceArn=event_source_arn)
1✔
2471
            archives = [archive["ArchiveName"] for archive in response["Archives"]]
1✔
2472
            for archive in archives:
1✔
2473
                try:
1✔
2474
                    aws_client.events.delete_archive(ArchiveName=archive)
1✔
2475
                except Exception as e:
×
2476
                    LOG.warning("Failed to delete archive %s: %s", archive, e)
×
2477

2478
            aws_client.events.delete_event_bus(Name=event_bus_name)
1✔
2479
        except Exception as e:
1✔
2480
            LOG.warning("Failed to delete event bus %s: %s", event_bus_name, e)
1✔
2481

2482

2483
@pytest.fixture
1✔
2484
def events_put_rule(aws_client):
1✔
2485
    rules = []
1✔
2486

2487
    def _put_rule(**kwargs):
1✔
2488
        if "Name" not in kwargs:
1✔
2489
            kwargs["Name"] = f"rule-{short_uid()}"
×
2490

2491
        response = aws_client.events.put_rule(**kwargs)
1✔
2492
        rules.append((kwargs["Name"], kwargs.get("EventBusName", "default")))
1✔
2493
        return response
1✔
2494

2495
    yield _put_rule
1✔
2496

2497
    for rule, event_bus_name in rules:
1✔
2498
        try:
1✔
2499
            response = aws_client.events.list_targets_by_rule(
1✔
2500
                Rule=rule, EventBusName=event_bus_name
2501
            )
2502
            targets = [target["Id"] for target in response["Targets"]]
1✔
2503

2504
            # Remove all targets for the current rule
2505
            if targets:
1✔
2506
                for target in targets:
1✔
2507
                    aws_client.events.remove_targets(
1✔
2508
                        Rule=rule, EventBusName=event_bus_name, Ids=[target]
2509
                    )
2510

2511
            aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name)
1✔
2512
        except Exception as e:
1✔
2513
            LOG.warning("Failed to delete rule %s: %s", rule, e)
1✔
2514

2515

2516
@pytest.fixture
1✔
2517
def events_create_rule(aws_client):
1✔
2518
    rules = []
1✔
2519

2520
    def _create_rule(**kwargs):
1✔
2521
        rule_name = kwargs["Name"]
1✔
2522
        bus_name = kwargs.get("EventBusName", "")
1✔
2523
        pattern = kwargs.get("EventPattern", {})
1✔
2524
        schedule = kwargs.get("ScheduleExpression", "")
1✔
2525
        rule_arn = aws_client.events.put_rule(
1✔
2526
            Name=rule_name,
2527
            EventBusName=bus_name,
2528
            EventPattern=json.dumps(pattern),
2529
            ScheduleExpression=schedule,
2530
        )["RuleArn"]
2531
        rules.append({"name": rule_name, "bus": bus_name})
1✔
2532
        return rule_arn
1✔
2533

2534
    yield _create_rule
1✔
2535

2536
    for rule in rules:
1✔
2537
        targets = aws_client.events.list_targets_by_rule(
1✔
2538
            Rule=rule["name"], EventBusName=rule["bus"]
2539
        )["Targets"]
2540

2541
        targetIds = [target["Id"] for target in targets]
1✔
2542
        if len(targetIds) > 0:
1✔
2543
            aws_client.events.remove_targets(
1✔
2544
                Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds
2545
            )
2546

2547
        aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1✔
2548

2549

2550
@pytest.fixture
1✔
2551
def sqs_as_events_target(aws_client, sqs_get_queue_arn):
1✔
2552
    queue_urls = []
1✔
2553

2554
    def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]:
1✔
2555
        if not queue_name:
1✔
2556
            queue_name = f"tests-queue-{short_uid()}"
1✔
2557
        sqs_client = aws_client.sqs
1✔
2558
        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
1✔
2559
        queue_urls.append(queue_url)
1✔
2560
        queue_arn = sqs_get_queue_arn(queue_url)
1✔
2561
        policy = {
1✔
2562
            "Version": "2012-10-17",
2563
            "Id": f"sqs-eventbridge-{short_uid()}",
2564
            "Statement": [
2565
                {
2566
                    "Sid": f"SendMessage-{short_uid()}",
2567
                    "Effect": "Allow",
2568
                    "Principal": {"Service": "events.amazonaws.com"},
2569
                    "Action": "sqs:SendMessage",
2570
                    "Resource": queue_arn,
2571
                }
2572
            ],
2573
        }
2574
        sqs_client.set_queue_attributes(
1✔
2575
            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
2576
        )
2577
        return queue_url, queue_arn
1✔
2578

2579
    yield _sqs_as_events_target
1✔
2580

2581
    for queue_url in queue_urls:
1✔
2582
        try:
1✔
2583
            aws_client.sqs.delete_queue(QueueUrl=queue_url)
1✔
2584
        except Exception as e:
×
2585
            LOG.debug("error cleaning up queue %s: %s", queue_url, e)
×
2586

2587

2588
@pytest.fixture
1✔
2589
def clean_up(
1✔
2590
    aws_client,
2591
):  # TODO: legacy clean up fixtures for eventbridge - remove and use individual fixtures for creating resources instead
2592
    def _clean_up(
1✔
2593
        bus_name=None,
2594
        rule_name=None,
2595
        target_ids=None,
2596
        queue_url=None,
2597
        log_group_name=None,
2598
    ):
2599
        events_client = aws_client.events
1✔
2600
        kwargs = {"EventBusName": bus_name} if bus_name else {}
1✔
2601
        if target_ids:
1✔
2602
            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]
1✔
2603
            call_safe(
1✔
2604
                events_client.remove_targets,
2605
                kwargs=dict(Rule=rule_name, Ids=target_ids, Force=True, **kwargs),
2606
            )
2607
        if rule_name:
1✔
2608
            call_safe(events_client.delete_rule, kwargs=dict(Name=rule_name, Force=True, **kwargs))
1✔
2609
        if bus_name:
1✔
2610
            call_safe(events_client.delete_event_bus, kwargs=dict(Name=bus_name))
×
2611
        if queue_url:
1✔
2612
            sqs_client = aws_client.sqs
×
2613
            call_safe(sqs_client.delete_queue, kwargs=dict(QueueUrl=queue_url))
×
2614
        if log_group_name:
1✔
2615
            logs_client = aws_client.logs
×
2616

2617
            def _delete_log_group():
×
2618
                log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
×
2619
                for log_stream in log_streams["logStreams"]:
×
2620
                    logs_client.delete_log_stream(
×
2621
                        logGroupName=log_group_name, logStreamName=log_stream["logStreamName"]
2622
                    )
2623
                logs_client.delete_log_group(logGroupName=log_group_name)
×
2624

2625
            call_safe(_delete_log_group)
×
2626

2627
    yield _clean_up
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc