• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.1
/localstack-core/localstack/services/apigateway/next_gen/execute_api/handlers/api_key_validation.py
1
import logging
1✔
2

3
from localstack.aws.api.apigateway import ApiKey, ApiKeySourceType, RestApi
1✔
4
from localstack.http import Response
1✔
5

6
from ..api import RestApiGatewayHandler, RestApiGatewayHandlerChain
1✔
7
from ..context import InvocationRequest, RestApiInvocationContext
1✔
8
from ..gateway_response import InvalidAPIKeyError
1✔
9
from ..moto_helpers import get_api_key, get_usage_plan_keys, get_usage_plans
1✔
10
from ..variables import ContextVarsIdentity
1✔
11

12
LOG = logging.getLogger(__name__)
1✔
13

14

15
class ApiKeyValidationHandler(RestApiGatewayHandler):
1✔
16
    """
17
    Handles Api key validation.
18
    If an api key is required, we will validate that a usage plan associated with that stage
19
    has a usage plan key with the corresponding value.
20
    """
21

22
    # TODO We currently do not support rate limiting or quota limit. As such we are not raising any related Exception
23

24
    def __call__(
1✔
25
        self,
26
        chain: RestApiGatewayHandlerChain,
27
        context: RestApiInvocationContext,
28
        response: Response,
29
    ):
30
        method = context.resource_method
1✔
31
        request = context.invocation_request
1✔
32
        rest_api = context.deployment.rest_api.rest_api
1✔
33

34
        # If api key is not required by the method, we can exit the handler
35
        if not method.get("apiKeyRequired"):
1✔
36
            return
1✔
37

38
        identity = context.context_variables.get("identity")
1✔
39

40
        # Look for the api key value in the request. If it is not found, raise an exception
41
        if not (api_key_value := self.get_request_api_key(rest_api, request, identity)):
1✔
42
            LOG.debug("API Key is empty")
1✔
43
            raise InvalidAPIKeyError("Forbidden")
1✔
44

45
        # Get the validated key, if no key is found, raise an exception
46
        if not (validated_key := self.validate_api_key(api_key_value, context)):
1✔
47
            LOG.debug("Provided API Key is not valid")
1✔
48
            raise InvalidAPIKeyError("Forbidden")
1✔
49

50
        # Update the context's identity with the key value and id
51
        if not identity.get("apiKey"):
1✔
52
            LOG.debug("Updating $context.identity.apiKey='%s'", validated_key["value"])
1✔
53
            identity["apiKey"] = validated_key["value"]
1✔
54

55
        LOG.debug("Updating $context.identity.apiKeyId='%s'", validated_key["id"])
1✔
56
        identity["apiKeyId"] = validated_key["id"]
1✔
57

58
    def validate_api_key(self, api_key_value, context: RestApiInvocationContext) -> ApiKey | None:
1✔
59
        api_id = context.api_id
1✔
60
        stage = context.stage
1✔
61
        account_id = context.account_id
1✔
62
        region = context.region
1✔
63

64
        # Get usage plans from the store
65
        usage_plans = get_usage_plans(account_id=account_id, region_name=region)
1✔
66

67
        # Loop through usage plans and keep ids of the plans associated with the deployment stage
68
        usage_plan_ids = []
1✔
69
        for usage_plan in usage_plans:
1✔
70
            api_stages = usage_plan.get("apiStages", [])
1✔
71
            usage_plan_ids.extend(
1✔
72
                usage_plan.get("id")
73
                for api_stage in api_stages
74
                if (api_stage.get("stage") == stage and api_stage.get("apiId") == api_id)
75
            )
76
        if not usage_plan_ids:
1✔
UNCOV
77
            LOG.debug("No associated usage plans found stage '%s'", stage)
×
UNCOV
78
            return
×
79

80
        # Loop through plans with an association with the stage find a key with matching value
81
        for usage_plan_id in usage_plan_ids:
1✔
82
            usage_plan_keys = get_usage_plan_keys(
1✔
83
                usage_plan_id=usage_plan_id, account_id=account_id, region_name=region
84
            )
85
            for key in usage_plan_keys:
1✔
86
                if key["value"] == api_key_value:
1✔
87
                    api_key = get_api_key(
1✔
88
                        api_key_id=key["id"], account_id=account_id, region_name=region
89
                    )
90
                    LOG.debug("Found Api Key '%s'", api_key["id"])
1✔
91
                    return api_key if api_key["enabled"] else None
1✔
92

93
    def get_request_api_key(
1✔
94
        self, rest_api: RestApi, request: InvocationRequest, identity: ContextVarsIdentity
95
    ) -> str | None:
96
        """https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-api-key-source.html
97
        The source of the API key for metering requests according to a usage plan.
98
        Valid values are:
99
        - HEADER to read the API key from the X-API-Key header of a request.
100
        - AUTHORIZER to read the API key from the Context Variables.
101
        """
102
        match api_key_source := rest_api.get("apiKeySource"):
1✔
103
            case ApiKeySourceType.HEADER:
1✔
104
                LOG.debug("Looking for api key in header 'X-API-Key'")
1✔
105
                return request.get("headers", {}).get("X-API-Key")
1✔
106
            case ApiKeySourceType.AUTHORIZER:
1✔
107
                LOG.debug("Looking for api key in Identity Context")
1✔
108
                return identity.get("apiKey")
1✔
UNCOV
109
            case _:
×
UNCOV
110
                LOG.debug("Api Key Source is not valid: '%s'", api_key_source)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc