• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / d8dc9956-71ea-40c6-95cb-26b2b584943a

08 May 2025 05:15PM UTC coverage: 86.66% (+0.1%) from 86.535%
d8dc9956-71ea-40c6-95cb-26b2b584943a

push

circleci

web-flow
CFn v2: Skip media type assertion (#12597)

64346 of 74251 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/localstack-core/localstack/services/lambda_/resource_providers/aws_lambda_version.py
1
# LocalStack Resource Provider Scaffolding v2
2
from __future__ import annotations
1✔
3

4
from pathlib import Path
1✔
5
from typing import Optional, TypedDict
1✔
6

7
import localstack.services.cloudformation.provider_utils as util
1✔
8
from localstack.services.cloudformation.resource_provider import (
1✔
9
    OperationStatus,
10
    ProgressEvent,
11
    ResourceProvider,
12
    ResourceRequest,
13
)
14

15

16
class LambdaVersionProperties(TypedDict):
1✔
17
    FunctionName: Optional[str]
1✔
18
    CodeSha256: Optional[str]
1✔
19
    Description: Optional[str]
1✔
20
    Id: Optional[str]
1✔
21
    ProvisionedConcurrencyConfig: Optional[ProvisionedConcurrencyConfiguration]
1✔
22
    Version: Optional[str]
1✔
23

24

25
class ProvisionedConcurrencyConfiguration(TypedDict):
1✔
26
    ProvisionedConcurrentExecutions: Optional[int]
1✔
27

28

29
REPEATED_INVOCATION = "repeated_invocation"
1✔
30

31

32
class LambdaVersionProvider(ResourceProvider[LambdaVersionProperties]):
1✔
33
    TYPE = "AWS::Lambda::Version"  # Autogenerated. Don't change
1✔
34
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change
1✔
35

36
    def create(
1✔
37
        self,
38
        request: ResourceRequest[LambdaVersionProperties],
39
    ) -> ProgressEvent[LambdaVersionProperties]:
40
        """
41
        Create a new resource.
42

43
        Primary identifier fields:
44
          - /properties/Id
45

46
        Required properties:
47
          - FunctionName
48

49
        Create-only properties:
50
          - /properties/FunctionName
51

52
        Read-only properties:
53
          - /properties/Id
54
          - /properties/Version
55

56

57

58
        """
59
        model = request.desired_state
1✔
60
        lambda_client = request.aws_client_factory.lambda_
1✔
61
        ctx = request.custom_context
1✔
62

63
        params = util.select_attributes(model, ["FunctionName", "CodeSha256", "Description"])
1✔
64

65
        if not ctx.get(REPEATED_INVOCATION):
1✔
66
            response = lambda_client.publish_version(**params)
1✔
67
            model["Version"] = response["Version"]
1✔
68
            model["Id"] = response["FunctionArn"]
1✔
69
            if model.get("ProvisionedConcurrencyConfig"):
1✔
70
                lambda_client.put_provisioned_concurrency_config(
1✔
71
                    FunctionName=model["FunctionName"],
72
                    Qualifier=model["Version"],
73
                    ProvisionedConcurrentExecutions=model["ProvisionedConcurrencyConfig"][
74
                        "ProvisionedConcurrentExecutions"
75
                    ],
76
                )
77
            ctx[REPEATED_INVOCATION] = True
1✔
78
            return ProgressEvent(
1✔
79
                status=OperationStatus.IN_PROGRESS,
80
                resource_model=model,
81
                custom_context=request.custom_context,
82
            )
83

84
        if model.get("ProvisionedConcurrencyConfig"):
1✔
85
            # Assumption: Ready provisioned concurrency implies the function version is ready
86
            provisioned_concurrency_config = lambda_client.get_provisioned_concurrency_config(
1✔
87
                FunctionName=model["FunctionName"],
88
                Qualifier=model["Version"],
89
            )
90
            if provisioned_concurrency_config["Status"] == "IN_PROGRESS":
1✔
91
                return ProgressEvent(
1✔
92
                    status=OperationStatus.IN_PROGRESS,
93
                    resource_model=model,
94
                    custom_context=request.custom_context,
95
                )
96
            elif provisioned_concurrency_config["Status"] == "READY":
1✔
97
                return ProgressEvent(
1✔
98
                    status=OperationStatus.SUCCESS,
99
                    resource_model=model,
100
                )
101
            else:
102
                return ProgressEvent(
×
103
                    status=OperationStatus.FAILED,
104
                    resource_model=model,
105
                    message="",
106
                    error_code="VersionStateFailure",  # TODO: not parity tested
107
                )
108
        else:
109
            version = lambda_client.get_function(FunctionName=model["Id"])
1✔
110
            if version["Configuration"]["State"] == "Pending":
1✔
111
                return ProgressEvent(
×
112
                    status=OperationStatus.IN_PROGRESS,
113
                    resource_model=model,
114
                    custom_context=request.custom_context,
115
                )
116
            elif version["Configuration"]["State"] == "Active":
1✔
117
                return ProgressEvent(
1✔
118
                    status=OperationStatus.SUCCESS,
119
                    resource_model=model,
120
                )
121
            else:
122
                return ProgressEvent(
×
123
                    status=OperationStatus.FAILED,
124
                    resource_model=model,
125
                    message="",
126
                    error_code="VersionStateFailure",  # TODO: not parity tested
127
                )
128

129
    def read(
1✔
130
        self,
131
        request: ResourceRequest[LambdaVersionProperties],
132
    ) -> ProgressEvent[LambdaVersionProperties]:
133
        """
134
        Fetch resource information
135

136

137
        """
138
        raise NotImplementedError
139

140
    def delete(
1✔
141
        self,
142
        request: ResourceRequest[LambdaVersionProperties],
143
    ) -> ProgressEvent[LambdaVersionProperties]:
144
        """
145
        Delete a resource
146

147

148
        """
149
        model = request.desired_state
1✔
150
        lambda_client = request.aws_client_factory.lambda_
1✔
151

152
        # without qualifier entire function is deleted instead of just version
153
        # provisioned concurrency is automatically deleted upon deleting a function or function version
154
        lambda_client.delete_function(FunctionName=model["Id"], Qualifier=model["Version"])
1✔
155

156
        return ProgressEvent(
1✔
157
            status=OperationStatus.SUCCESS,
158
            resource_model=model,
159
            custom_context=request.custom_context,
160
        )
161

162
    def update(
1✔
163
        self,
164
        request: ResourceRequest[LambdaVersionProperties],
165
    ) -> ProgressEvent[LambdaVersionProperties]:
166
        """
167
        Update a resource
168

169

170
        """
171
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc