• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18505123992

14 Oct 2025 05:30PM UTC coverage: 86.888% (-0.01%) from 86.899%
18505123992

push

github

web-flow
S3: fix `aws-global` validation in CreateBucket (#13250)

10 of 10 new or added lines in 4 files covered. (100.0%)

831 existing lines in 40 files now uncovered.

68028 of 78294 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.96
/localstack-core/localstack/utils/aws/message_forwarding.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import uuid
1✔
6

7
from moto.events.models import events_backends
1✔
8

9
from localstack.aws.connect import connect_to
1✔
10
from localstack.services.apigateway.legacy.helpers import extract_query_string_params
1✔
11
from localstack.utils import collections
1✔
12
from localstack.utils.aws.arns import (
1✔
13
    extract_account_id_from_arn,
14
    extract_region_from_arn,
15
    firehose_name,
16
    sqs_queue_url_for_arn,
17
)
18
from localstack.utils.http import add_path_parameters_to_url, add_query_params_to_url
1✔
19
from localstack.utils.http import safe_requests as requests
1✔
20
from localstack.utils.strings import to_bytes, to_str
1✔
21
from localstack.utils.time import now_utc
1✔
22

23
LOG = logging.getLogger(__name__)
1✔
24

25
AUTH_BASIC = "BASIC"
1✔
26
AUTH_API_KEY = "API_KEY"
1✔
27
AUTH_OAUTH = "OAUTH_CLIENT_CREDENTIALS"
1✔
28

29

30
# TODO: refactor/split this. too much here is service specific
31
def send_event_to_target(
1✔
32
    target_arn: str,
33
    event: dict,
34
    target_attributes: dict = None,
35
    asynchronous: bool = True,
36
    target: dict = None,
37
    role: str = None,
38
    source_arn: str = None,
39
    source_service: str = None,
40
    events_source: str = None,  # optional data for publishing to EventBridge
41
    events_detail_type: str = None,  # optional data for publishing to EventBridge
42
):
43
    region = extract_region_from_arn(target_arn)
1✔
44
    account_id = extract_account_id_from_arn(source_arn)
1✔
45

46
    if target is None:
1✔
47
        target = {}
1✔
48
    if role:
1✔
49
        clients = connect_to.with_assumed_role(
1✔
50
            role_arn=role, service_principal=source_service, region_name=region
51
        )
52
    else:
UNCOV
53
        clients = connect_to(aws_access_key_id=account_id, region_name=region)
×
54

55
    if ":lambda:" in target_arn:
1✔
UNCOV
56
        lambda_client = clients.lambda_.request_metadata(
×
57
            service_principal=source_service, source_arn=source_arn
58
        )
UNCOV
59
        lambda_client.invoke(
×
60
            FunctionName=target_arn,
61
            Payload=to_bytes(json.dumps(event)),
62
            InvocationType="Event" if asynchronous else "RequestResponse",
63
        )
64

65
    elif ":sns:" in target_arn:
1✔
66
        sns_client = clients.sns.request_metadata(
1✔
67
            service_principal=source_service, source_arn=source_arn
68
        )
69
        sns_client.publish(TopicArn=target_arn, Message=json.dumps(event))
1✔
70

71
    elif ":sqs:" in target_arn:
1✔
72
        sqs_client = clients.sqs.request_metadata(
1✔
73
            service_principal=source_service, source_arn=source_arn
74
        )
75
        queue_url = sqs_queue_url_for_arn(target_arn)
1✔
76
        msg_group_id = collections.get_safe(target_attributes, "$.SqsParameters.MessageGroupId")
1✔
77
        kwargs = {"MessageGroupId": msg_group_id} if msg_group_id else {}
1✔
78
        sqs_client.send_message(
1✔
79
            QueueUrl=queue_url, MessageBody=json.dumps(event, separators=(",", ":")), **kwargs
80
        )
81

82
    elif ":states:" in target_arn:
1✔
UNCOV
83
        account_id = extract_account_id_from_arn(target_arn)
×
84
        stepfunctions_client = connect_to(
×
85
            aws_access_key_id=account_id, region_name=region
86
        ).stepfunctions
UNCOV
87
        stepfunctions_client.start_execution(stateMachineArn=target_arn, input=json.dumps(event))
×
88

89
    elif ":firehose:" in target_arn:
1✔
UNCOV
90
        delivery_stream_name = firehose_name(target_arn)
×
91
        firehose_client = clients.firehose.request_metadata(
×
92
            service_principal=source_service, source_arn=source_arn
93
        )
UNCOV
94
        firehose_client.put_record(
×
95
            DeliveryStreamName=delivery_stream_name,
96
            Record={"Data": to_bytes(json.dumps(event))},
97
        )
98

99
    elif ":events:" in target_arn:
1✔
100
        if ":api-destination/" in target_arn or ":destination/" in target_arn:
1✔
UNCOV
101
            send_event_to_api_destination(target_arn, event, target.get("HttpParameters"))
×
102

103
        else:
104
            events_client = clients.events.request_metadata(
1✔
105
                service_principal=source_service, source_arn=source_arn
106
            )
107
            eventbus_name = target_arn.split(":")[-1].split("/")[-1]
1✔
108
            detail = event.get("detail") or event
1✔
109
            resources = event.get("resources") or [source_arn] if source_arn else []
1✔
110
            events_client.put_events(
1✔
111
                Entries=[
112
                    {
113
                        "EventBusName": eventbus_name,
114
                        "Source": events_source or event.get("source", source_service) or "",
115
                        "DetailType": events_detail_type or event.get("detail-type", ""),
116
                        "Detail": json.dumps(detail),
117
                        "Resources": resources,
118
                    }
119
                ]
120
            )
121

UNCOV
122
    elif ":kinesis:" in target_arn:
×
123
        partition_key_path = collections.get_safe(
×
124
            target_attributes,
125
            "$.KinesisParameters.PartitionKeyPath",
126
            default_value="$.id",
127
        )
128

UNCOV
129
        stream_name = target_arn.split("/")[-1]
×
130
        partition_key = collections.get_safe(event, partition_key_path, event["id"])
×
131
        kinesis_client = clients.kinesis.request_metadata(
×
132
            service_principal=source_service, source_arn=source_arn
133
        )
134

UNCOV
135
        kinesis_client.put_record(
×
136
            StreamName=stream_name,
137
            Data=to_bytes(json.dumps(event)),
138
            PartitionKey=partition_key,
139
        )
140

UNCOV
141
    elif ":logs:" in target_arn:
×
142
        log_group_name = target_arn.split(":")[6]
×
143
        logs_client = clients.logs.request_metadata(
×
144
            service_principal=source_service, source_arn=source_arn
145
        )
UNCOV
146
        log_stream_name = str(uuid.uuid4())
×
147
        logs_client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
×
148
        logs_client.put_log_events(
×
149
            logGroupName=log_group_name,
150
            logStreamName=log_stream_name,
151
            logEvents=[{"timestamp": now_utc(millis=True), "message": json.dumps(event)}],
152
        )
153
    else:
UNCOV
154
        LOG.warning('Unsupported Events rule target ARN: "%s"', target_arn)
×
155

156

157
def auth_keys_from_connection(connection: dict):
1✔
UNCOV
158
    headers = {}
×
159

UNCOV
160
    auth_type = connection.get("AuthorizationType").upper()
×
161
    auth_parameters = connection.get("AuthParameters")
×
162
    if auth_type == AUTH_BASIC:
×
163
        basic_auth_parameters = auth_parameters.get("BasicAuthParameters", {})
×
164
        username = basic_auth_parameters.get("Username", "")
×
165
        password = basic_auth_parameters.get("Password", "")
×
166
        auth = "Basic " + to_str(base64.b64encode(f"{username}:{password}".encode("ascii")))
×
167
        headers.update({"authorization": auth})
×
168

UNCOV
169
    if auth_type == AUTH_API_KEY:
×
170
        api_key_parameters = auth_parameters.get("ApiKeyAuthParameters", {})
×
171
        api_key_name = api_key_parameters.get("ApiKeyName", "")
×
172
        api_key_value = api_key_parameters.get("ApiKeyValue", "")
×
173
        headers.update({api_key_name: api_key_value})
×
174

UNCOV
175
    if auth_type == AUTH_OAUTH:
×
176
        oauth_parameters = auth_parameters.get("OAuthParameters", {})
×
177
        oauth_method = oauth_parameters.get("HttpMethod")
×
178

UNCOV
179
        oauth_http_parameters = oauth_parameters.get("OAuthHttpParameters", {})
×
180
        oauth_endpoint = oauth_parameters.get("AuthorizationEndpoint", "")
×
181
        query_object = list_of_parameters_to_object(
×
182
            oauth_http_parameters.get("QueryStringParameters", [])
183
        )
UNCOV
184
        oauth_endpoint = add_query_params_to_url(oauth_endpoint, query_object)
×
185

UNCOV
186
        client_parameters = oauth_parameters.get("ClientParameters", {})
×
187
        client_id = client_parameters.get("ClientID", "")
×
188
        client_secret = client_parameters.get("ClientSecret", "")
×
189

UNCOV
190
        oauth_body = list_of_parameters_to_object(oauth_http_parameters.get("BodyParameters", []))
×
191
        oauth_body.update({"client_id": client_id, "client_secret": client_secret})
×
192

UNCOV
193
        oauth_header = list_of_parameters_to_object(
×
194
            oauth_http_parameters.get("HeaderParameters", [])
195
        )
UNCOV
196
        oauth_result = requests.request(
×
197
            method=oauth_method,
198
            url=oauth_endpoint,
199
            data=json.dumps(oauth_body),
200
            headers=oauth_header,
201
        )
UNCOV
202
        oauth_data = json.loads(oauth_result.text)
×
203

UNCOV
204
        token_type = oauth_data.get("token_type", "")
×
205
        access_token = oauth_data.get("access_token", "")
×
206
        auth_header = f"{token_type} {access_token}"
×
207
        headers.update({"authorization": auth_header})
×
208

UNCOV
209
    return headers
×
210

211

212
def list_of_parameters_to_object(items):
1✔
213
    return {item.get("Key"): item.get("Value") for item in items}
1✔
214

215

216
def send_event_to_api_destination(target_arn, event, http_parameters: dict | None = None):
1✔
217
    """Send an event to an EventBridge API destination
218
    See https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-api-destinations.html"""
219

220
    # ARN format: ...:api-destination/{name}/{uuid}
UNCOV
221
    account_id = extract_account_id_from_arn(target_arn)
×
222
    region = extract_region_from_arn(target_arn)
×
223

UNCOV
224
    api_destination_name = target_arn.split(":")[-1].split("/")[1]
×
225
    events_client = connect_to(aws_access_key_id=account_id, region_name=region).events
×
226
    destination = events_client.describe_api_destination(Name=api_destination_name)
×
227

228
    # get destination endpoint details
UNCOV
229
    method = destination.get("HttpMethod", "GET")
×
230
    endpoint = destination.get("InvocationEndpoint")
×
231
    state = destination.get("ApiDestinationState") or "ACTIVE"
×
232

UNCOV
233
    LOG.debug('Calling EventBridge API destination (state "%s"): %s %s', state, method, endpoint)
×
234
    headers = {
×
235
        # default headers AWS sends with every api destination call
236
        "User-Agent": "Amazon/EventBridge/ApiDestinations",
237
        "Content-Type": "application/json; charset=utf-8",
238
        "Range": "bytes=0-1048575",
239
        "Accept-Encoding": "gzip,deflate",
240
        "Connection": "close",
241
    }
242

UNCOV
243
    endpoint = add_api_destination_authorization(destination, headers, event)
×
244
    if http_parameters:
×
245
        endpoint = add_target_http_parameters(http_parameters, endpoint, headers, event)
×
246

UNCOV
247
    result = requests.request(
×
248
        method=method, url=endpoint, data=json.dumps(event or {}), headers=headers
249
    )
UNCOV
250
    if result.status_code >= 400:
×
251
        LOG.debug("Received code %s forwarding events: %s %s", result.status_code, method, endpoint)
×
252
        if result.status_code == 429 or 500 <= result.status_code <= 600:
×
253
            pass  # TODO: retry logic (only retry on 429 and 5xx response status)
×
254

255

256
def add_api_destination_authorization(destination, headers, event):
1✔
UNCOV
257
    connection_arn = destination.get("ConnectionArn", "")
×
258
    connection_name = re.search(r"connection\/([a-zA-Z0-9-_]+)\/", connection_arn).group(1)
×
259

UNCOV
260
    account_id = extract_account_id_from_arn(connection_arn)
×
261
    region = extract_region_from_arn(connection_arn)
×
262

263
    # Using backend directly due to boto hiding passwords, keys and secret values
UNCOV
264
    event_backend = events_backends[account_id][region]
×
265
    connection = event_backend.describe_connection(name=connection_name)
×
266

UNCOV
267
    headers.update(auth_keys_from_connection(connection))
×
268

UNCOV
269
    auth_parameters = connection.get("AuthParameters", {})
×
270
    invocation_parameters = auth_parameters.get("InvocationHttpParameters")
×
271

UNCOV
272
    endpoint = destination.get("InvocationEndpoint")
×
273
    if invocation_parameters:
×
274
        header_parameters = list_of_parameters_to_object(
×
275
            invocation_parameters.get("HeaderParameters", [])
276
        )
UNCOV
277
        headers.update(header_parameters)
×
278

UNCOV
279
        body_parameters = list_of_parameters_to_object(
×
280
            invocation_parameters.get("BodyParameters", [])
281
        )
UNCOV
282
        event.update(body_parameters)
×
283

UNCOV
284
        query_parameters = invocation_parameters.get("QueryStringParameters", [])
×
285
        query_object = list_of_parameters_to_object(query_parameters)
×
286
        endpoint = add_query_params_to_url(endpoint, query_object)
×
287

UNCOV
288
    return endpoint
×
289

290

291
def add_target_http_parameters(http_parameters: dict, endpoint: str, headers: dict, body):
1✔
292
    endpoint = add_path_parameters_to_url(endpoint, http_parameters.get("PathParameterValues", []))
1✔
293

294
    # The request should prioritze connection header/query parameters over target params if there is an overlap
295
    query_params = http_parameters.get("QueryStringParameters", {})
1✔
296
    prev_query_params = extract_query_string_params(endpoint)[1]
1✔
297
    query_params.update(prev_query_params)
1✔
298
    endpoint = add_query_params_to_url(endpoint, query_params)
1✔
299

300
    target_headers = http_parameters.get("HeaderParameters", {})
1✔
301
    for target_header in target_headers:
1✔
302
        if target_header not in headers:
1✔
303
            headers.update({target_header: target_headers.get(target_header)})
1✔
304

305
    return endpoint
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc