• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.0
/localstack-core/localstack/services/lambda_/resource_providers/aws_lambda_permission.py
1
# LocalStack Resource Provider Scaffolding v2
2
from __future__ import annotations
1✔
3

4
import json
1✔
5
from pathlib import Path
1✔
6
from typing import TypedDict
1✔
7

8
import localstack.services.cloudformation.provider_utils as util
1✔
9
from localstack.services.cloudformation.resource_provider import (
1✔
10
    OperationStatus,
11
    ProgressEvent,
12
    ResourceProvider,
13
    ResourceRequest,
14
)
15

16

17
class LambdaPermissionProperties(TypedDict):
1✔
18
    Action: str | None
1✔
19
    FunctionName: str | None
1✔
20
    Principal: str | None
1✔
21
    EventSourceToken: str | None
1✔
22
    FunctionUrlAuthType: str | None
1✔
23
    Id: str | None
1✔
24
    PrincipalOrgID: str | None
1✔
25
    SourceAccount: str | None
1✔
26
    SourceArn: str | None
1✔
27

28

29
REPEATED_INVOCATION = "repeated_invocation"
1✔
30

31

32
class LambdaPermissionProvider(ResourceProvider[LambdaPermissionProperties]):
1✔
33
    TYPE = "AWS::Lambda::Permission"  # Autogenerated. Don't change
1✔
34
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change
1✔
35

36
    def create(
1✔
37
        self,
38
        request: ResourceRequest[LambdaPermissionProperties],
39
    ) -> ProgressEvent[LambdaPermissionProperties]:
40
        """
41
        Create a new resource.
42

43
        Primary identifier fields:
44
          - /properties/Id
45

46
        Required properties:
47
          - FunctionName
48
          - Action
49
          - Principal
50

51
        Create-only properties:
52
          - /properties/SourceAccount
53
          - /properties/FunctionUrlAuthType
54
          - /properties/SourceArn
55
          - /properties/Principal
56
          - /properties/Action
57
          - /properties/FunctionName
58
          - /properties/EventSourceToken
59
          - /properties/PrincipalOrgID
60

61
        Read-only properties:
62
          - /properties/Id
63

64

65
        """
66
        model = request.desired_state
1✔
67
        lambda_client = request.aws_client_factory.lambda_
1✔
68

69
        params = util.select_attributes(
1✔
70
            model=model, params=["FunctionName", "Action", "Principal", "SourceArn"]
71
        )
72

73
        params["StatementId"] = util.generate_default_name(
1✔
74
            request.stack_name, request.logical_resource_id
75
        )
76

77
        response = lambda_client.add_permission(**params)
1✔
78

79
        parsed_statement = json.loads(response["Statement"])
1✔
80
        model["Id"] = parsed_statement["Sid"]
1✔
81

82
        return ProgressEvent(
1✔
83
            status=OperationStatus.SUCCESS,
84
            resource_model=model,
85
            custom_context=request.custom_context,
86
        )
87

88
    def read(
1✔
89
        self,
90
        request: ResourceRequest[LambdaPermissionProperties],
91
    ) -> ProgressEvent[LambdaPermissionProperties]:
92
        """
93
        Fetch resource information
94

95

96
        """
97
        raise NotImplementedError
98

99
    def delete(
1✔
100
        self,
101
        request: ResourceRequest[LambdaPermissionProperties],
102
    ) -> ProgressEvent[LambdaPermissionProperties]:
103
        """
104
        Delete a resource
105

106

107
        """
108
        model = request.desired_state
1✔
109
        lambda_client = request.aws_client_factory.lambda_
1✔
110
        try:
1✔
111
            lambda_client.remove_permission(
1✔
112
                FunctionName=model.get("FunctionName"), StatementId=model["Id"]
113
            )
114
        except lambda_client.exceptions.ResourceNotFoundException:
1✔
115
            pass
1✔
116

117
        return ProgressEvent(
1✔
118
            status=OperationStatus.SUCCESS,
119
            resource_model=model,
120
            custom_context=request.custom_context,
121
        )
122

123
    def update(
1✔
124
        self,
125
        request: ResourceRequest[LambdaPermissionProperties],
126
    ) -> ProgressEvent[LambdaPermissionProperties]:
127
        """
128
        Update a resource
129

130

131
        """
132
        model = request.desired_state
×
133
        lambda_client = request.aws_client_factory.lambda_
×
134

135
        if not model.get("Id"):
×
136
            model["Id"] = request.previous_state["Id"]
×
137

138
        params = util.select_attributes(
×
139
            model=model, params=["FunctionName", "Action", "Principal", "SourceArn"]
140
        )
141

142
        try:
×
143
            lambda_client.remove_permission(
×
144
                FunctionName=model.get("FunctionName"), StatementId=model["Id"]
145
            )
146
        except lambda_client.exceptions.ResourceNotFoundException:
×
147
            pass
×
148

149
        lambda_client.add_permission(StatementId=model["Id"], **params)
×
150

151
        return ProgressEvent(
×
152
            status=OperationStatus.SUCCESS,
153
            resource_model=model,
154
            custom_context=request.custom_context,
155
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc