• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / a53c0273-9548-479e-ab3c-f3af40c0e980

13 May 2025 05:31PM UTC coverage: 86.624% (-0.03%) from 86.658%
a53c0273-9548-479e-ab3c-f3af40c0e980

push

circleci

web-flow
ASF: Mark optional params as such (X | None) (#12614)

5 of 7 new or added lines in 2 files covered. (71.43%)

34 existing lines in 16 files now uncovered.

64347 of 74283 relevant lines covered (86.62%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/localstack-core/localstack/utils/aws/resources.py
1
from localstack.aws.api.dynamodb import CreateTableOutput, DescribeTableOutput
1✔
2
from localstack.aws.connect import connect_to
1✔
3
from localstack.constants import AWS_REGION_US_EAST_1
1✔
4
from localstack.utils.aws.aws_stack import LOG
1✔
5
from localstack.utils.functions import run_safe
1✔
6
from localstack.utils.sync import poll_condition
1✔
7

8

9
# TODO: make s3_client mandatory
10
def get_or_create_bucket(bucket_name: str, s3_client=None):
1✔
11
    s3_client = s3_client or connect_to().s3
×
12
    try:
×
13
        return s3_client.head_bucket(Bucket=bucket_name)
×
14
    except Exception:
×
15
        return create_s3_bucket(bucket_name, s3_client=s3_client)
×
16

17

18
def create_s3_bucket(bucket_name: str, s3_client=None):
1✔
19
    """Creates a bucket in the region that is associated with the current request
20
    context, or with the given boto3 S3 client, if specified."""
21
    s3_client = s3_client or connect_to().s3
1✔
22
    region = s3_client.meta.region_name
1✔
23
    kwargs = {}
1✔
24
    if region != AWS_REGION_US_EAST_1:
1✔
UNCOV
25
        kwargs = {"CreateBucketConfiguration": {"LocationConstraint": region}}
×
26
    return s3_client.create_bucket(Bucket=bucket_name, **kwargs)
1✔
27

28

29
# TODO: Harmonise the return value
30
def create_dynamodb_table(
1✔
31
    table_name: str,
32
    partition_key: str,
33
    stream_view_type: str = None,
34
    region_name: str = None,
35
    client=None,
36
    wait_for_active: bool = True,
37
) -> CreateTableOutput | DescribeTableOutput:
38
    """Utility method to create a DynamoDB table"""
39

40
    dynamodb = client or connect_to(region_name=region_name).dynamodb
1✔
41
    stream_spec = {"StreamEnabled": False}
1✔
42
    key_schema = [{"AttributeName": partition_key, "KeyType": "HASH"}]
1✔
43
    attr_defs = [{"AttributeName": partition_key, "AttributeType": "S"}]
1✔
44
    if stream_view_type is not None:
1✔
45
        stream_spec = {"StreamEnabled": True, "StreamViewType": stream_view_type}
1✔
46
    table = None
1✔
47
    try:
1✔
48
        table = dynamodb.create_table(
1✔
49
            TableName=table_name,
50
            KeySchema=key_schema,
51
            AttributeDefinitions=attr_defs,
52
            BillingMode="PAY_PER_REQUEST",
53
            StreamSpecification=stream_spec,
54
        )
55
    except Exception as e:
×
56
        if "ResourceInUseException" in str(e):
×
57
            # Table already exists -> return table reference
58
            return dynamodb.describe_table(TableName=table_name)
×
59
        if "AccessDeniedException" in str(e):
×
60
            raise
×
61

62
    def _is_active():
1✔
63
        return dynamodb.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"
1✔
64

65
    if wait_for_active:
1✔
66
        poll_condition(_is_active)
1✔
67

68
    return table
1✔
69

70

71
# TODO make client mandatory
72
def create_api_gateway(
1✔
73
    name,
74
    description=None,
75
    resources=None,
76
    stage_name=None,
77
    enabled_api_keys=None,
78
    usage_plan_name=None,
79
    auth_creator_func=None,  # function that receives an api_id and returns an authorizer_id
80
    client=None,
81
):
82
    if enabled_api_keys is None:
1✔
83
        enabled_api_keys = []
1✔
84
    if not client:
1✔
85
        client = connect_to().apigateway
×
86
    resources = resources or []
1✔
87
    stage_name = stage_name or "testing"
1✔
88
    usage_plan_name = usage_plan_name or "Basic Usage"
1✔
89
    description = description or 'Test description for API "%s"' % name
1✔
90

91
    LOG.info('Creating API resources under API Gateway "%s".', name)
1✔
92
    api = client.create_rest_api(name=name, description=description)
1✔
93
    api_id = api["id"]
1✔
94

95
    auth_id = None
1✔
96
    if auth_creator_func:
1✔
97
        auth_id = auth_creator_func(api_id)
×
98

99
    resources_list = client.get_resources(restApiId=api_id)
1✔
100
    root_res_id = resources_list["items"][0]["id"]
1✔
101
    # add API resources and methods
102
    for path, methods in resources.items():
1✔
103
        # create resources recursively
104
        parent_id = root_res_id
1✔
105
        for path_part in path.split("/"):
1✔
106
            api_resource = client.create_resource(
1✔
107
                restApiId=api_id, parentId=parent_id, pathPart=path_part
108
            )
109
            parent_id = api_resource["id"]
1✔
110
        # add methods to the API resource
111
        for method in methods:
1✔
112
            kwargs = {"authorizerId": auth_id} if auth_id else {}
1✔
113
            client.put_method(
1✔
114
                restApiId=api_id,
115
                resourceId=api_resource["id"],
116
                httpMethod=method["httpMethod"],
117
                authorizationType=method.get("authorizationType") or "NONE",
118
                apiKeyRequired=method.get("apiKeyRequired") or False,
119
                requestParameters=method.get("requestParameters") or {},
120
                requestModels=method.get("requestModels") or {},
121
                **kwargs,
122
            )
123
            # create integrations for this API resource/method
124
            integrations = method["integrations"]
1✔
125
            create_api_gateway_integrations(
1✔
126
                api_id,
127
                api_resource["id"],
128
                method,
129
                integrations,
130
                client=client,
131
            )
132

133
    # deploy the API gateway
134
    client.create_deployment(restApiId=api_id, stageName=stage_name)
1✔
135
    return api
1✔
136

137

138
# TODO make client mandatory
139
def create_api_gateway_integrations(api_id, resource_id, method, integrations=None, client=None):
1✔
140
    if integrations is None:
1✔
141
        integrations = []
×
142
    if not client:
1✔
143
        client = connect_to().apigateway
×
144
    for integration in integrations:
1✔
145
        req_templates = integration.get("requestTemplates") or {}
1✔
146
        res_templates = integration.get("responseTemplates") or {}
1✔
147
        success_code = integration.get("successCode") or "200"
1✔
148
        client_error_code = integration.get("clientErrorCode") or "400"
1✔
149
        server_error_code = integration.get("serverErrorCode") or "500"
1✔
150
        request_parameters = integration.get("requestParameters") or {}
1✔
151
        credentials = integration.get("credentials") or ""
1✔
152

153
        # create integration
154
        client.put_integration(
1✔
155
            restApiId=api_id,
156
            resourceId=resource_id,
157
            httpMethod=method["httpMethod"],
158
            integrationHttpMethod=method.get("integrationHttpMethod") or method["httpMethod"],
159
            type=integration["type"],
160
            uri=integration["uri"],
161
            requestTemplates=req_templates,
162
            requestParameters=request_parameters,
163
            credentials=credentials,
164
        )
165
        response_configs = [
1✔
166
            {"pattern": "^2.*", "code": success_code, "res_templates": res_templates},
167
            {"pattern": "^4.*", "code": client_error_code, "res_templates": {}},
168
            {"pattern": "^5.*", "code": server_error_code, "res_templates": {}},
169
        ]
170
        # create response configs
171
        for response_config in response_configs:
1✔
172
            # create integration response
173
            client.put_integration_response(
1✔
174
                restApiId=api_id,
175
                resourceId=resource_id,
176
                httpMethod=method["httpMethod"],
177
                statusCode=response_config["code"],
178
                responseTemplates=response_config["res_templates"],
179
                selectionPattern=response_config["pattern"],
180
            )
181
            # create method response
182
            client.put_method_response(
1✔
183
                restApiId=api_id,
184
                resourceId=resource_id,
185
                httpMethod=method["httpMethod"],
186
                statusCode=response_config["code"],
187
            )
188

189

190
def create_kinesis_stream(client, stream_name: str, shards: int = 1, delete: bool = False) -> None:
1✔
191
    client.create_stream(StreamName=stream_name, ShardCount=shards)
1✔
192
    if delete:
1✔
193
        run_safe(lambda: client.delete_stream(StreamName=stream_name), print_error=False)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc