• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 5c4ca713-fcdb-4e24-ad10-bfabba4e9639

06 May 2025 02:39PM UTC coverage: 86.545% (-0.01%) from 86.558%
5c4ca713-fcdb-4e24-ad10-bfabba4e9639

push

circleci

web-flow
Events: add classmethod to recreate services (#12566)

7 of 13 new or added lines in 2 files covered. (53.85%)

41 existing lines in 14 files now uncovered.

64186 of 74165 relevant lines covered (86.54%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.13
/localstack-core/localstack/services/events/api_destination.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5

6
import requests
1✔
7

8
from localstack.aws.api.events import (
1✔
9
    ApiDestinationDescription,
10
    ApiDestinationHttpMethod,
11
    ApiDestinationInvocationRateLimitPerSecond,
12
    ApiDestinationName,
13
    ApiDestinationState,
14
    Arn,
15
    ConnectionArn,
16
    ConnectionAuthorizationType,
17
    ConnectionState,
18
    HttpsEndpoint,
19
    Timestamp,
20
)
21
from localstack.aws.connect import connect_to
1✔
22
from localstack.services.events.models import ApiDestination, Connection, ValidationException
1✔
23
from localstack.utils.aws.arns import (
1✔
24
    extract_account_id_from_arn,
25
    extract_region_from_arn,
26
    parse_arn,
27
)
28
from localstack.utils.aws.message_forwarding import (
1✔
29
    list_of_parameters_to_object,
30
)
31
from localstack.utils.http import add_query_params_to_url
1✔
32
from localstack.utils.strings import to_str
1✔
33

34
VALID_AUTH_TYPES = [t.value for t in ConnectionAuthorizationType]
1✔
35
LOG = logging.getLogger(__name__)
1✔
36

37

38
class APIDestinationService:
1✔
39
    def __init__(
1✔
40
        self,
41
        name: ApiDestinationName,
42
        region: str,
43
        account_id: str,
44
        connection_arn: ConnectionArn,
45
        connection: Connection,
46
        invocation_endpoint: HttpsEndpoint,
47
        http_method: ApiDestinationHttpMethod,
48
        invocation_rate_limit_per_second: ApiDestinationInvocationRateLimitPerSecond | None,
49
        description: ApiDestinationDescription | None = None,
50
    ):
51
        self.validate_input(name, connection_arn, http_method, invocation_endpoint)
1✔
52
        self.connection = connection
1✔
53
        state = self._get_state()
1✔
54

55
        self.api_destination = ApiDestination(
1✔
56
            name,
57
            region,
58
            account_id,
59
            connection_arn,
60
            invocation_endpoint,
61
            http_method,
62
            state,
63
            invocation_rate_limit_per_second,
64
            description,
65
        )
66

67
    @classmethod
1✔
68
    def restore_from_api_destination_and_connection(
1✔
69
        cls, api_destination: ApiDestination, connection: Connection
70
    ):
NEW
71
        api_destination_service = cls(
×
72
            name=api_destination.name,
73
            region=api_destination.region,
74
            account_id=api_destination.account_id,
75
            connection_arn=api_destination.connection_arn,
76
            connection=connection,
77
            invocation_endpoint=api_destination.invocation_endpoint,
78
            http_method=api_destination.http_method,
79
            invocation_rate_limit_per_second=api_destination.invocation_rate_limit_per_second,
80
        )
NEW
81
        api_destination_service.api_destination = api_destination
×
NEW
82
        return api_destination_service
×
83

84
    @property
1✔
85
    def arn(self) -> Arn:
1✔
86
        return self.api_destination.arn
1✔
87

88
    @property
1✔
89
    def state(self) -> ApiDestinationState:
1✔
90
        return self.api_destination.state
1✔
91

92
    @property
1✔
93
    def creation_time(self) -> Timestamp:
1✔
94
        return self.api_destination.creation_time
1✔
95

96
    @property
1✔
97
    def last_modified_time(self) -> Timestamp:
1✔
98
        return self.api_destination.last_modified_time
1✔
99

100
    def set_state(self, state: ApiDestinationState) -> None:
1✔
101
        if hasattr(self, "api_destination"):
1✔
102
            if state == ApiDestinationState.ACTIVE:
1✔
103
                state = self._get_state()
1✔
104
            self.api_destination.state = state
1✔
105

106
    def update(
1✔
107
        self,
108
        connection,
109
        invocation_endpoint,
110
        http_method,
111
        invocation_rate_limit_per_second,
112
        description,
113
    ):
114
        self.set_state(ApiDestinationState.INACTIVE)
1✔
115
        self.connection = connection
1✔
116
        self.api_destination.connection_arn = connection.arn
1✔
117
        if invocation_endpoint:
1✔
118
            self.api_destination.invocation_endpoint = invocation_endpoint
1✔
119
        if http_method:
1✔
120
            self.api_destination.http_method = http_method
1✔
121
        if invocation_rate_limit_per_second:
1✔
UNCOV
122
            self.api_destination.invocation_rate_limit_per_second = invocation_rate_limit_per_second
×
123
        if description:
1✔
124
            self.api_destination.description = description
1✔
125
        self.api_destination.last_modified_time = Timestamp.now()
1✔
126
        self.set_state(ApiDestinationState.ACTIVE)
1✔
127

128
    def _get_state(self) -> ApiDestinationState:
1✔
129
        """Determine ApiDestinationState based on ConnectionState."""
130
        return (
1✔
131
            ApiDestinationState.ACTIVE
132
            if self.connection.state == ConnectionState.AUTHORIZED
133
            else ApiDestinationState.INACTIVE
134
        )
135

136
    @classmethod
1✔
137
    def validate_input(
1✔
138
        cls,
139
        name: ApiDestinationName,
140
        connection_arn: ConnectionArn,
141
        http_method: ApiDestinationHttpMethod,
142
        invocation_endpoint: HttpsEndpoint,
143
    ) -> None:
144
        errors = []
1✔
145
        errors.extend(cls._validate_api_destination_name(name))
1✔
146
        errors.extend(cls._validate_connection_arn(connection_arn))
1✔
147
        errors.extend(cls._validate_http_method(http_method))
1✔
148
        errors.extend(cls._validate_invocation_endpoint(invocation_endpoint))
1✔
149

150
        if errors:
1✔
151
            error_message = (
1✔
152
                f"{len(errors)} validation error{'s' if len(errors) > 1 else ''} detected: "
153
            )
154
            error_message += "; ".join(errors)
1✔
155
            raise ValidationException(error_message)
1✔
156

157
    @staticmethod
1✔
158
    def _validate_api_destination_name(name: str) -> list[str]:
1✔
159
        """Validate the API destination name according to AWS rules. Returns a list of validation errors."""
160
        errors = []
1✔
161
        if not re.match(r"^[\.\-_A-Za-z0-9]+$", name):
1✔
162
            errors.append(
1✔
163
                f"Value '{name}' at 'name' failed to satisfy constraint: "
164
                "Member must satisfy regular expression pattern: [\\.\\-_A-Za-z0-9]+"
165
            )
166
        if not (1 <= len(name) <= 64):
1✔
UNCOV
167
            errors.append(
×
168
                f"Value '{name}' at 'name' failed to satisfy constraint: "
169
                "Member must have length less than or equal to 64"
170
            )
171
        return errors
1✔
172

173
    @staticmethod
1✔
174
    def _validate_connection_arn(connection_arn: ConnectionArn) -> list[str]:
1✔
175
        errors = []
1✔
176
        if not re.match(
1✔
177
            r"^arn:aws([a-z]|\-)*:events:[a-z0-9\-]+:\d{12}:connection/[\.\-_A-Za-z0-9]+/[\-A-Za-z0-9]+$",
178
            connection_arn,
179
        ):
180
            errors.append(
1✔
181
                f"Value '{connection_arn}' at 'connectionArn' failed to satisfy constraint: "
182
                "Member must satisfy regular expression pattern: "
183
                "^arn:aws([a-z]|\\-)*:events:([a-z]|\\d|\\-)*:([0-9]{12})?:connection\\/[\\.\\-_A-Za-z0-9]+\\/[\\-A-Za-z0-9]+$"
184
            )
185
        return errors
1✔
186

187
    @staticmethod
1✔
188
    def _validate_http_method(http_method: ApiDestinationHttpMethod) -> list[str]:
1✔
189
        errors = []
1✔
190
        allowed_methods = ["HEAD", "POST", "PATCH", "DELETE", "PUT", "GET", "OPTIONS"]
1✔
191
        if http_method not in allowed_methods:
1✔
192
            errors.append(
1✔
193
                f"Value '{http_method}' at 'httpMethod' failed to satisfy constraint: "
194
                f"Member must satisfy enum value set: [{', '.join(allowed_methods)}]"
195
            )
196
        return errors
1✔
197

198
    @staticmethod
1✔
199
    def _validate_invocation_endpoint(invocation_endpoint: HttpsEndpoint) -> list[str]:
1✔
200
        errors = []
1✔
201
        endpoint_pattern = r"^((%[0-9A-Fa-f]{2}|[-()_.!~*';/?:@&=+$,A-Za-z0-9])+)([).!';/?:,])?$"
1✔
202
        if not re.match(endpoint_pattern, invocation_endpoint):
1✔
UNCOV
203
            errors.append(
×
204
                f"Value '{invocation_endpoint}' at 'invocationEndpoint' failed to satisfy constraint: "
205
                "Member must satisfy regular expression pattern: "
206
                "^((%[0-9A-Fa-f]{2}|[-()_.!~*';/?:@&=+$,A-Za-z0-9])+)([).!';/?:,])?$"
207
            )
208
        return errors
1✔
209

210

211
ApiDestinationServiceDict = dict[Arn, APIDestinationService]
1✔
212

213

214
def add_api_destination_authorization(destination, headers, event):
1✔
215
    connection_arn = destination.get("ConnectionArn", "")
1✔
216
    connection_name = re.search(r"connection\/([a-zA-Z0-9-_]+)\/", connection_arn).group(1)
1✔
217

218
    account_id = extract_account_id_from_arn(connection_arn)
1✔
219
    region = extract_region_from_arn(connection_arn)
1✔
220

221
    events_client = connect_to(aws_access_key_id=account_id, region_name=region).events
1✔
222
    connection_details = events_client.describe_connection(Name=connection_name)
1✔
223
    secret_arn = connection_details["SecretArn"]
1✔
224
    parsed_arn = parse_arn(secret_arn)
1✔
225
    secretsmanager_client = connect_to(
1✔
226
        aws_access_key_id=parsed_arn["account"], region_name=parsed_arn["region"]
227
    ).secretsmanager
228
    auth_secret = json.loads(
1✔
229
        secretsmanager_client.get_secret_value(SecretId=secret_arn)["SecretString"]
230
    )
231

232
    headers.update(_auth_keys_from_connection(connection_details, auth_secret))
1✔
233

234
    auth_parameters = connection_details.get("AuthParameters", {})
1✔
235
    invocation_parameters = auth_parameters.get("InvocationHttpParameters")
1✔
236

237
    endpoint = destination.get("InvocationEndpoint")
1✔
238
    if invocation_parameters:
1✔
239
        header_parameters = list_of_parameters_to_object(
1✔
240
            invocation_parameters.get("HeaderParameters", [])
241
        )
242
        headers.update(header_parameters)
1✔
243

244
        body_parameters = list_of_parameters_to_object(
1✔
245
            invocation_parameters.get("BodyParameters", [])
246
        )
247
        event.update(body_parameters)
1✔
248

249
        query_parameters = invocation_parameters.get("QueryStringParameters", [])
1✔
250
        query_object = list_of_parameters_to_object(query_parameters)
1✔
251
        endpoint = add_query_params_to_url(endpoint, query_object)
1✔
252

253
    return endpoint
1✔
254

255

256
def _auth_keys_from_connection(connection_details, auth_secret):
1✔
257
    headers = {}
1✔
258

259
    auth_type = connection_details.get("AuthorizationType").upper()
1✔
260
    auth_parameters = connection_details.get("AuthParameters")
1✔
261
    match auth_type:
1✔
262
        case ConnectionAuthorizationType.BASIC:
1✔
263
            username = auth_secret.get("username", "")
1✔
264
            password = auth_secret.get("password", "")
1✔
265
            auth = "Basic " + to_str(base64.b64encode(f"{username}:{password}".encode("ascii")))
1✔
266
            headers.update({"authorization": auth})
1✔
267

268
        case ConnectionAuthorizationType.API_KEY:
1✔
269
            api_key_name = auth_secret.get("api_key_name", "")
1✔
270
            api_key_value = auth_secret.get("api_key_value", "")
1✔
271
            headers.update({api_key_name: api_key_value})
1✔
272

273
        case ConnectionAuthorizationType.OAUTH_CLIENT_CREDENTIALS:
1✔
274
            oauth_parameters = auth_parameters.get("OAuthParameters", {})
1✔
275
            oauth_method = auth_secret.get("http_method")
1✔
276

277
            oauth_http_parameters = oauth_parameters.get("OAuthHttpParameters", {})
1✔
278
            oauth_endpoint = auth_secret.get("authorization_endpoint", "")
1✔
279
            query_object = list_of_parameters_to_object(
1✔
280
                oauth_http_parameters.get("QueryStringParameters", [])
281
            )
282
            oauth_endpoint = add_query_params_to_url(oauth_endpoint, query_object)
1✔
283

284
            client_id = auth_secret.get("client_id", "")
1✔
285
            client_secret = auth_secret.get("client_secret", "")
1✔
286

287
            oauth_body = list_of_parameters_to_object(
1✔
288
                oauth_http_parameters.get("BodyParameters", [])
289
            )
290
            oauth_body.update({"client_id": client_id, "client_secret": client_secret})
1✔
291

292
            oauth_header = list_of_parameters_to_object(
1✔
293
                oauth_http_parameters.get("HeaderParameters", [])
294
            )
295
            oauth_result = requests.request(
1✔
296
                method=oauth_method,
297
                url=oauth_endpoint,
298
                data=json.dumps(oauth_body),
299
                headers=oauth_header,
300
            )
301
            oauth_data = json.loads(oauth_result.text)
1✔
302

303
            token_type = oauth_data.get("token_type", "")
1✔
304
            access_token = oauth_data.get("access_token", "")
1✔
305
            auth_header = f"{token_type} {access_token}"
1✔
306
            headers.update({"authorization": auth_header})
1✔
307

308
    return headers
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc