• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / d8dc9956-71ea-40c6-95cb-26b2b584943a

08 May 2025 05:15PM UTC coverage: 86.66% (+0.1%) from 86.535%
d8dc9956-71ea-40c6-95cb-26b2b584943a

push

circleci

web-flow
CFn v2: Skip media type assertion (#12597)

64346 of 74251 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/localstack-core/localstack/services/lambda_/resource_providers/lambda_alias.py
1
# LocalStack Resource Provider Scaffolding v2
2
from __future__ import annotations
1✔
3

4
from pathlib import Path
1✔
5
from typing import Optional, TypedDict
1✔
6

7
import localstack.services.cloudformation.provider_utils as util
1✔
8
from localstack.services.cloudformation.resource_provider import (
1✔
9
    OperationStatus,
10
    ProgressEvent,
11
    ResourceProvider,
12
    ResourceRequest,
13
)
14

15

16
class LambdaAliasProperties(TypedDict):
1✔
17
    FunctionName: Optional[str]
1✔
18
    FunctionVersion: Optional[str]
1✔
19
    Name: Optional[str]
1✔
20
    Description: Optional[str]
1✔
21
    Id: Optional[str]
1✔
22
    ProvisionedConcurrencyConfig: Optional[ProvisionedConcurrencyConfiguration]
1✔
23
    RoutingConfig: Optional[AliasRoutingConfiguration]
1✔
24

25

26
class ProvisionedConcurrencyConfiguration(TypedDict):
1✔
27
    ProvisionedConcurrentExecutions: Optional[int]
1✔
28

29

30
class VersionWeight(TypedDict):
1✔
31
    FunctionVersion: Optional[str]
1✔
32
    FunctionWeight: Optional[float]
1✔
33

34

35
class AliasRoutingConfiguration(TypedDict):
1✔
36
    AdditionalVersionWeights: Optional[list[VersionWeight]]
1✔
37

38

39
REPEATED_INVOCATION = "repeated_invocation"
1✔
40

41

42
class LambdaAliasProvider(ResourceProvider[LambdaAliasProperties]):
1✔
43
    TYPE = "AWS::Lambda::Alias"  # Autogenerated. Don't change
1✔
44
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change
1✔
45

46
    def create(
1✔
47
        self,
48
        request: ResourceRequest[LambdaAliasProperties],
49
    ) -> ProgressEvent[LambdaAliasProperties]:
50
        """
51
        Create a new resource.
52

53
        Primary identifier fields:
54
          - /properties/Id
55

56
        Required properties:
57
          - FunctionName
58
          - FunctionVersion
59
          - Name
60

61
        Create-only properties:
62
          - /properties/Name
63
          - /properties/FunctionName
64

65
        Read-only properties:
66
          - /properties/Id
67

68

69

70
        """
71
        model = request.desired_state
1✔
72
        lambda_ = request.aws_client_factory.lambda_
1✔
73

74
        create_params = util.select_attributes(
1✔
75
            model, ["FunctionName", "FunctionVersion", "Name", "Description", "RoutingConfig"]
76
        )
77

78
        ctx = request.custom_context
1✔
79
        if not ctx.get(REPEATED_INVOCATION):
1✔
80
            result = lambda_.create_alias(**create_params)
1✔
81
            model["Id"] = result["AliasArn"]
1✔
82
            ctx[REPEATED_INVOCATION] = True
1✔
83

84
            if model.get("ProvisionedConcurrencyConfig"):
1✔
85
                lambda_.put_provisioned_concurrency_config(
1✔
86
                    FunctionName=model["FunctionName"],
87
                    Qualifier=model["Name"],
88
                    ProvisionedConcurrentExecutions=model["ProvisionedConcurrencyConfig"][
89
                        "ProvisionedConcurrentExecutions"
90
                    ],
91
                )
92

93
            return ProgressEvent(
1✔
94
                status=OperationStatus.IN_PROGRESS,
95
                resource_model=model,
96
                custom_context=ctx,
97
            )
98

99
        if ctx.get(REPEATED_INVOCATION) and model.get("ProvisionedConcurrencyConfig"):
1✔
100
            # get provisioned config status
101
            result = lambda_.get_provisioned_concurrency_config(
1✔
102
                FunctionName=model["FunctionName"],
103
                Qualifier=model["Name"],
104
            )
105
            if result["Status"] == "IN_PROGRESS":
1✔
106
                return ProgressEvent(
1✔
107
                    status=OperationStatus.IN_PROGRESS,
108
                    resource_model=model,
109
                )
110
            elif result["Status"] == "READY":
1✔
111
                return ProgressEvent(
1✔
112
                    status=OperationStatus.SUCCESS,
113
                    resource_model=model,
114
                )
115
            else:
116
                return ProgressEvent(
×
117
                    status=OperationStatus.FAILED,
118
                    resource_model=model,
119
                    message="",
120
                    error_code="VersionStateFailure",  # TODO: not parity tested
121
                )
122

123
        return ProgressEvent(
1✔
124
            status=OperationStatus.SUCCESS,
125
            resource_model=model,
126
        )
127

128
    def read(
1✔
129
        self,
130
        request: ResourceRequest[LambdaAliasProperties],
131
    ) -> ProgressEvent[LambdaAliasProperties]:
132
        """
133
        Fetch resource information
134

135

136
        """
137
        raise NotImplementedError
138

139
    def delete(
1✔
140
        self,
141
        request: ResourceRequest[LambdaAliasProperties],
142
    ) -> ProgressEvent[LambdaAliasProperties]:
143
        """
144
        Delete a resource
145

146

147
        """
148
        model = request.desired_state
1✔
149
        lambda_ = request.aws_client_factory.lambda_
1✔
150

151
        try:
1✔
152
            # provisioned concurrency is automatically deleted upon deleting a function alias
153
            lambda_.delete_alias(
1✔
154
                FunctionName=model["FunctionName"],
155
                Name=model["Name"],
156
            )
157
        except lambda_.exceptions.ResourceNotFoundException:
×
158
            pass
×
159

160
        return ProgressEvent(
1✔
161
            status=OperationStatus.SUCCESS,
162
            resource_model=request.previous_state,
163
        )
164

165
    def update(
1✔
166
        self,
167
        request: ResourceRequest[LambdaAliasProperties],
168
    ) -> ProgressEvent[LambdaAliasProperties]:
169
        """
170
        Update a resource
171

172

173
        """
174
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc