• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.42
/localstack-core/localstack/aws/scaffold.py
1
import io
1✔
2
import keyword
1✔
3
import re
1✔
4
from collections import OrderedDict
1✔
5
from functools import cached_property
1✔
6
from multiprocessing import Pool
1✔
7
from pathlib import Path
1✔
8

9
import click
1✔
10
from botocore import xform_name
1✔
11
from botocore.exceptions import UnknownServiceError
1✔
12
from botocore.model import (
1✔
13
    ListShape,
14
    MapShape,
15
    OperationModel,
16
    ServiceModel,
17
    Shape,
18
    StringShape,
19
    StructureShape,
20
)
21

22
from localstack.aws.spec import load_service
1✔
23
from localstack.utils.common import camel_to_snake_case, snake_to_camel_case
1✔
24

25
# Some minification packages might treat "type" as a keyword, some specs define shapes called like the type "Optional"
26
KEYWORDS = list(keyword.kwlist) + ["type", "Optional", "Union"]
1✔
27
is_keyword = KEYWORDS.__contains__
1✔
28

29

30
def is_bad_param_name(name: str) -> bool:
1✔
31
    if name == "context":
1✔
32
        return True
1✔
33

34
    if is_keyword(name):
1✔
35
        return True
1✔
36

37
    return False
1✔
38

39

40
def to_valid_python_name(spec_name: str) -> str:
1✔
41
    sanitized = re.sub(r"[^0-9a-zA-Z_]+", "_", spec_name)
1✔
42

43
    if sanitized[0].isnumeric():
1✔
44
        sanitized = "i_" + sanitized
1✔
45

46
    if is_keyword(sanitized):
1✔
47
        sanitized += "_"
1✔
48

49
    if sanitized.startswith("__"):
1✔
50
        sanitized = sanitized[1:]
1✔
51

52
    return sanitized
1✔
53

54

55
def html_to_rst(html: str):
1✔
UNCOV
56
    import pypandoc
×
57

UNCOV
58
    doc = pypandoc.convert_text(html, "rst", format="html")
×
59
    doc = doc.replace("\_", "_")  # noqa: W605
×
60
    doc = doc.replace("\|", "|")  # noqa: W605
×
61
    doc = doc.replace("\ ", " ")  # noqa: W605
×
62
    doc = doc.replace("\\", "\\\\")  # noqa: W605
×
63
    rst = doc.strip()
×
64
    return rst
×
65

66

67
class ShapeNode:
1✔
68
    service: ServiceModel
1✔
69
    shape: Shape
1✔
70

71
    def __init__(self, service: ServiceModel, shape: Shape) -> None:
1✔
72
        super().__init__()
1✔
73
        self.service = service
1✔
74
        self.shape = shape
1✔
75

76
    @cached_property
1✔
77
    def request_operation(self) -> OperationModel | None:
1✔
78
        for operation_name in self.service.operation_names:
1✔
79
            operation = self.service.operation_model(operation_name)
1✔
80
            if operation.input_shape is None:
1✔
81
                continue
1✔
82

83
            if to_valid_python_name(self.shape.name) == to_valid_python_name(
1✔
84
                operation.input_shape.name
85
            ):
86
                return operation
1✔
87

88
        return None
1✔
89

90
    @cached_property
1✔
91
    def response_operation(self) -> OperationModel | None:
1✔
92
        for operation_name in self.service.operation_names:
1✔
93
            operation = self.service.operation_model(operation_name)
1✔
94
            if operation.output_shape is None:
1✔
95
                continue
1✔
96

97
            if to_valid_python_name(self.shape.name) == to_valid_python_name(
1✔
98
                operation.output_shape.name
99
            ):
100
                return operation
1✔
101

102
        return None
1✔
103

104
    @cached_property
1✔
105
    def is_request(self):
1✔
106
        return self.request_operation is not None
1✔
107

108
    @cached_property
1✔
109
    def is_response(self):
1✔
110
        return self.response_operation is not None
1✔
111

112
    @property
1✔
113
    def name(self) -> str:
1✔
UNCOV
114
        return to_valid_python_name(self.shape.name)
×
115

116
    @cached_property
1✔
117
    def is_exception(self):
1✔
118
        metadata = self.shape.metadata
1✔
119
        return metadata.get("error") or metadata.get("exception")
1✔
120

121
    @property
1✔
122
    def is_primitive(self):
1✔
123
        return self.shape.type_name in ["integer", "boolean", "float", "double", "string"]
1✔
124

125
    @property
1✔
126
    def is_enum(self):
1✔
127
        return isinstance(self.shape, StringShape) and self.shape.enum
1✔
128

129
    @property
1✔
130
    def dependencies(self) -> list[str]:
1✔
131
        shape = self.shape
1✔
132

133
        if isinstance(shape, StructureShape):
1✔
134
            return [to_valid_python_name(v.name) for v in shape.members.values()]
1✔
135
        if isinstance(shape, ListShape):
1✔
136
            return [to_valid_python_name(shape.member.name)]
1✔
137
        if isinstance(shape, MapShape):
1✔
138
            return [to_valid_python_name(shape.key.name), to_valid_python_name(shape.value.name)]
1✔
139

140
        return []
1✔
141

142
    def _print_structure_declaration(self, output, doc=True, quote_types=False):
1✔
143
        if self.is_exception:
1✔
144
            self._print_as_class(output, "ServiceException", doc, quote_types)
1✔
145
            return
1✔
146

147
        if any(map(is_keyword, self.shape.members.keys())):
1✔
148
            self._print_as_typed_dict(output, doc, quote_types)
1✔
149
            return
1✔
150

151
        if self.is_request:
1✔
152
            base = "ServiceRequest"
1✔
153
        else:
154
            base = "TypedDict, total=False"
1✔
155

156
        self._print_as_class(output, base, doc, quote_types)
1✔
157

158
    def _print_as_class(self, output, base: str, doc=True, quote_types=False):
1✔
159
        output.write(f"class {to_valid_python_name(self.shape.name)}({base}):\n")
1✔
160

161
        q = '"' if quote_types else ""
1✔
162

163
        if doc:
1✔
UNCOV
164
            self.print_shape_doc(output, self.shape)
×
165

166
        if self.is_exception:
1✔
167
            error_spec = self.shape.metadata.get("error", {})
1✔
168
            output.write(f'    code: str = "{error_spec.get("code", self.shape.name)}"\n')
1✔
169
            output.write(f"    sender_fault: bool = {error_spec.get('senderFault', False)}\n")
1✔
170
            output.write(f"    status_code: int = {error_spec.get('httpStatusCode', 400)}\n")
1✔
171
        elif not self.shape.members:
1✔
172
            output.write("    pass\n")
1✔
173

174
        # Avoid generating members for the common error members:
175
        # - The message will always be the exception message (first argument of the exception class init)
176
        # - The code is already set above
177
        # - The type is the sender_fault which is already set above
178
        remaining_members = {
1✔
179
            k: v
180
            for k, v in self.shape.members.items()
181
            if not self.is_exception or k.lower() not in ["message", "code"]
182
        }
183

184
        # render any streaming payload first
185
        if self.is_request and self.request_operation.has_streaming_input:
1✔
186
            member: str = self.request_operation.input_shape.serialization.get("payload")
1✔
187
            shape: Shape = self.request_operation.get_streaming_input()
1✔
188
            if member in self.shape.required_members:
1✔
189
                output.write(f"    {member}: IO[{q}{to_valid_python_name(shape.name)}{q}]\n")
1✔
190
            else:
191
                output.write(
1✔
192
                    f"    {member}: Optional[IO[{q}{to_valid_python_name(shape.name)}{q}]]\n"
193
                )
194
            del remaining_members[member]
1✔
195
        # render the streaming payload first
196
        if self.is_response and self.response_operation.has_streaming_output:
1✔
197
            member: str = self.response_operation.output_shape.serialization.get("payload")
1✔
198
            shape: Shape = self.response_operation.get_streaming_output()
1✔
199
            shape_name = to_valid_python_name(shape.name)
1✔
200
            if member in self.shape.required_members:
1✔
UNCOV
201
                output.write(
×
202
                    f"    {member}: Union[{q}{shape_name}{q}, IO[{q}{shape_name}{q}], Iterable[{q}{shape_name}{q}]]\n"
203
                )
204
            else:
205
                output.write(
1✔
206
                    f"    {member}: Optional[Union[{q}{shape_name}{q}, IO[{q}{shape_name}{q}], Iterable[{q}{shape_name}{q}]]]\n"
207
                )
208
            del remaining_members[member]
1✔
209

210
        for k, v in remaining_members.items():
1✔
211
            if k in self.shape.required_members:
1✔
212
                if v.serialization.get("eventstream"):
1✔
213
                    output.write(f"    {k}: Iterator[{q}{to_valid_python_name(v.name)}{q}]\n")
1✔
214
                else:
215
                    output.write(f"    {k}: {q}{to_valid_python_name(v.name)}{q}\n")
1✔
216
            else:
217
                if v.serialization.get("eventstream"):
1✔
218
                    output.write(f"    {k}: Iterator[{q}{to_valid_python_name(v.name)}{q}]\n")
1✔
219
                else:
220
                    output.write(f"    {k}: Optional[{q}{to_valid_python_name(v.name)}{q}]\n")
1✔
221

222
    def _print_as_typed_dict(self, output, doc=True, quote_types=False):
1✔
223
        name = to_valid_python_name(self.shape.name)
1✔
224
        output.write('%s = TypedDict("%s", {\n' % (name, name))
1✔
225
        for k, v in self.shape.members.items():
1✔
226
            member_name = to_valid_python_name(v.name)
1✔
227
            # check if the member name is the same as the type name (recursive types need to use forward references)
228
            recursive_type = name == member_name
1✔
229
            q = '"' if quote_types or recursive_type else ""
1✔
230
            if k in self.shape.required_members:
1✔
231
                if v.serialization.get("eventstream"):
1✔
UNCOV
232
                    output.write(f'    "{k}": Iterator[{q}{member_name}{q}],\n')
×
233
                else:
234
                    output.write(f'    "{k}": {q}{member_name}{q},\n')
1✔
235
            else:
236
                if v.serialization.get("eventstream"):
1✔
UNCOV
237
                    output.write(f'    "{k}": Iterator[{q}{member_name}{q}],\n')
×
238
                else:
239
                    output.write(f'    "{k}": Optional[{q}{member_name}{q}],\n')
1✔
240
        output.write("}, total=False)")
1✔
241

242
    def print_shape_doc(self, output, shape):
1✔
UNCOV
243
        html = shape.documentation
×
244
        rst = html_to_rst(html)
×
245
        if rst:
×
246
            output.write('    """')
×
247
            output.write(f"{rst}\n")
×
248
            output.write('    """\n')
×
249

250
    def print_declaration(self, output, doc=True, quote_types=False):
1✔
251
        shape = self.shape
1✔
252

253
        q = '"' if quote_types else ""
1✔
254

255
        if isinstance(shape, StructureShape):
1✔
256
            self._print_structure_declaration(output, doc, quote_types)
1✔
257
        elif isinstance(shape, ListShape):
1✔
258
            output.write(
1✔
259
                f"{to_valid_python_name(shape.name)} = List[{q}{to_valid_python_name(shape.member.name)}{q}]"
260
            )
261
        elif isinstance(shape, MapShape):
1✔
262
            output.write(
1✔
263
                f"{to_valid_python_name(shape.name)} = Dict[{q}{to_valid_python_name(shape.key.name)}{q}, {q}{to_valid_python_name(shape.value.name)}{q}]"
264
            )
265
        elif isinstance(shape, StringShape):
1✔
266
            if shape.enum:
1✔
267
                output.write(f"class {to_valid_python_name(shape.name)}(StrEnum):\n")
1✔
268
                for value in shape.enum:
1✔
269
                    name = to_valid_python_name(value)
1✔
270
                    output.write(f'    {name} = "{value}"\n')
1✔
271
            else:
272
                output.write(f"{to_valid_python_name(shape.name)} = str")
1✔
273
        elif shape.type_name == "string":
1✔
UNCOV
274
            output.write(f"{to_valid_python_name(shape.name)} = str")
×
275
        elif shape.type_name == "integer":
1✔
276
            output.write(f"{to_valid_python_name(shape.name)} = int")
1✔
277
        elif shape.type_name == "long":
1✔
278
            output.write(f"{to_valid_python_name(shape.name)} = int")
1✔
279
        elif shape.type_name == "double":
1✔
280
            output.write(f"{to_valid_python_name(shape.name)} = float")
1✔
281
        elif shape.type_name == "float":
1✔
UNCOV
282
            output.write(f"{to_valid_python_name(shape.name)} = float")
×
283
        elif shape.type_name == "boolean":
1✔
284
            output.write(f"{to_valid_python_name(shape.name)} = bool")
1✔
285
        elif shape.type_name == "blob":
1✔
286
            # blobs are often associated with streaming payloads, but we handle that on operation level,
287
            # not on shape level
288
            output.write(f"{to_valid_python_name(shape.name)} = bytes")
1✔
289
        elif shape.type_name == "timestamp":
1✔
290
            output.write(f"{to_valid_python_name(shape.name)} = datetime")
1✔
291
        else:
UNCOV
292
            output.write(
×
293
                f"# unknown shape type for {to_valid_python_name(shape.name)}: {shape.type_name}"
294
            )
295
        # TODO: BoxedInteger?
296

297
        output.write("\n")
1✔
298

299
    def get_order(self):
1✔
300
        """
301
        Defines a basic order in which to sort the stack of shape nodes before printing.
302
        First all non-enum primitives are printed, then enums, then exceptions, then all other types.
303
        """
304
        if self.is_primitive:
1✔
305
            if self.is_enum:
1✔
306
                return 1
1✔
307
            else:
308
                return 0
1✔
309

310
        if self.is_exception:
1✔
311
            return 2
1✔
312

313
        return 3
1✔
314

315

316
def generate_service_types(output, service: ServiceModel, doc=True):
1✔
317
    output.write("from datetime import datetime\n")
1✔
318
    output.write("from enum import StrEnum\n")
1✔
319
    output.write(
1✔
320
        "from typing import Dict, List, Optional, Iterator, Iterable, IO, Union, TypedDict\n"
321
    )
322
    output.write("\n")
1✔
323
    output.write(
1✔
324
        "from localstack.aws.api import handler, RequestContext, ServiceException, ServiceRequest"
325
    )
326
    output.write("\n")
1✔
327

328
    # ==================================== print type declarations
329
    nodes: dict[str, ShapeNode] = {}
1✔
330

331
    for shape_name in service.shape_names:
1✔
332
        shape = service.shape_for(shape_name)
1✔
333
        nodes[to_valid_python_name(shape_name)] = ShapeNode(service, shape)
1✔
334

335
    # output.write("__all__ = [\n")
336
    # for name in nodes.keys():
337
    #     output.write(f'    "{name}",\n')
338
    # output.write("]\n")
339

340
    printed: set[str] = set()
1✔
341
    visited: set[str] = set()
1✔
342
    stack: list[str] = list(nodes.keys())
1✔
343

344
    stack = sorted(stack, key=lambda name: nodes[name].get_order())
1✔
345
    stack.reverse()
1✔
346

347
    while stack:
1✔
348
        name = stack.pop()
1✔
349
        if name in printed:
1✔
350
            continue
1✔
351
        node = nodes[name]
1✔
352

353
        dependencies = [dep for dep in node.dependencies if dep not in printed]
1✔
354

355
        if not dependencies:
1✔
356
            node.print_declaration(output, doc=doc)
1✔
357
            printed.add(name)
1✔
358
        elif name in visited:
1✔
359
            # break out of circular dependencies
360
            node.print_declaration(output, doc=doc, quote_types=True)
1✔
361
            printed.add(name)
1✔
362
        else:
363
            stack.append(name)
1✔
364
            stack.extend(dependencies)
1✔
365
            visited.add(name)
1✔
366

367

368
def generate_service_api(output, service: ServiceModel, doc=True):
1✔
369
    service_name = service.service_name.replace("-", "_")
1✔
370
    class_name = service_name + "_api"
1✔
371
    class_name = snake_to_camel_case(class_name)
1✔
372

373
    output.write(f"class {class_name}:\n")
1✔
374
    output.write("\n")
1✔
375
    output.write(f'    service = "{service.service_name}"\n')
1✔
376
    output.write(f'    version = "{service.api_version}"\n')
1✔
377
    for op_name in service.operation_names:
1✔
378
        operation: OperationModel = service.operation_model(op_name)
1✔
379

380
        fn_name = camel_to_snake_case(op_name)
1✔
381

382
        if operation.output_shape:
1✔
383
            output_shape = to_valid_python_name(operation.output_shape.name)
1✔
384
        else:
385
            output_shape = "None"
1✔
386

387
        output.write("\n")
1✔
388
        parameters = OrderedDict()
1✔
389
        param_shapes = OrderedDict()
1✔
390

391
        if input_shape := operation.input_shape:
1✔
392
            members = list(input_shape.members)
1✔
393

394
            streaming_payload_member = None
1✔
395
            if operation.has_streaming_input:
1✔
396
                streaming_payload_member = operation.input_shape.serialization.get("payload")
1✔
397

398
            for m in input_shape.required_members:
1✔
399
                members.remove(m)
1✔
400
                m_shape = input_shape.members[m]
1✔
401
                type_name = to_valid_python_name(m_shape.name)
1✔
402
                if m == streaming_payload_member:
1✔
403
                    type_name = f"IO[{type_name}]"
1✔
404
                parameters[xform_name(m)] = type_name
1✔
405
                param_shapes[xform_name(m)] = m_shape
1✔
406

407
            for m in members:
1✔
408
                m_shape = input_shape.members[m]
1✔
409
                param_shapes[xform_name(m)] = m_shape
1✔
410
                type_name = to_valid_python_name(m_shape.name)
1✔
411
                if m == streaming_payload_member:
1✔
412
                    type_name = f"IO[{type_name}]"
1✔
413
                parameters[xform_name(m)] = f"{type_name} | None = None"
1✔
414

415
        if any(map(is_bad_param_name, parameters.keys())):
1✔
416
            # if we cannot render the parameter name, don't expand the parameters in the handler
417
            param_list = f"request: {to_valid_python_name(input_shape.name)}" if input_shape else ""
1✔
418
            output.write(f'    @handler("{operation.name}", expand=False)\n')
1✔
419
        else:
420
            param_list = ", ".join([f"{k}: {v}" for k, v in parameters.items()])
1✔
421
            output.write(f'    @handler("{operation.name}")\n')
1✔
422

423
        # add the **kwargs in the end
424
        if param_list:
1✔
425
            param_list += ", **kwargs"
1✔
426
        else:
427
            param_list = "**kwargs"
1✔
428

429
        output.write(
1✔
430
            f"    def {fn_name}(self, context: RequestContext, {param_list}) -> {output_shape}:\n"
431
        )
432

433
        # convert html documentation to rst and print it into to the signature
434
        if doc:
1✔
UNCOV
435
            html = operation.documentation
×
436
            rst = html_to_rst(html)
×
437
            output.write('        """')
×
438
            output.write(f"{rst}\n")
×
439
            output.write("\n")
×
440

441
            # parameters
UNCOV
442
            for param_name, shape in param_shapes.items():
×
443
                # FIXME: this doesn't work properly
UNCOV
444
                rst = html_to_rst(shape.documentation)
×
445
                rst = rst.strip().split(".")[0] + "."
×
446
                output.write(f":param {param_name}: {rst}\n")
×
447

448
            # return value
UNCOV
449
            if operation.output_shape:
×
450
                output.write(f":returns: {to_valid_python_name(operation.output_shape.name)}\n")
×
451

452
            # errors
UNCOV
453
            for error in operation.error_shapes:
×
454
                output.write(f":raises {to_valid_python_name(error.name)}:\n")
×
455

UNCOV
456
            output.write('        """\n')
×
457

458
        output.write("        raise NotImplementedError\n")
459

460

461
@click.group()
1✔
462
def scaffold():
1✔
UNCOV
463
    pass
×
464

465

466
@scaffold.command(name="generate")
1✔
467
@click.argument("service", type=str)
1✔
468
@click.option("--doc/--no-doc", default=False, help="whether or not to generate docstrings")
1✔
469
@click.option(
1✔
470
    "--save/--print",
471
    default=False,
472
    help="whether or not to save the result into the api directory",
473
)
474
@click.option(
1✔
475
    "--path",
476
    default="./localstack-core/localstack/aws/api",
477
    help="the path where the api should be saved",
478
)
479
def generate(service: str, doc: bool, save: bool, path: str):
1✔
480
    """
481
    Generate types and API stubs for a given AWS service.
482

483
    SERVICE is the service to generate the stubs for (e.g., sqs, or cloudformation)
484
    """
485
    from click import ClickException
1✔
486

487
    try:
1✔
488
        code = generate_code(service, doc=doc)
1✔
UNCOV
489
    except UnknownServiceError:
×
490
        raise ClickException(f"unknown service {service}")
×
491

492
    if not save:
1✔
493
        # either just print the code to stdout
494
        click.echo(code)
1✔
495
        return
1✔
496

497
    # or find the file path and write the code to that location
UNCOV
498
    create_code_directory(service, code, path)
×
499
    click.echo("done!")
×
500

501

502
def generate_code(service_name: str, doc: bool = False) -> str:
1✔
503
    model = load_service(service_name)
1✔
504
    output = io.StringIO()
1✔
505
    generate_service_types(output, model, doc=doc)
1✔
506
    generate_service_api(output, model, doc=doc)
1✔
507
    return output.getvalue()
1✔
508

509

510
def create_code_directory(service_name: str, code: str, base_path: str):
1✔
UNCOV
511
    service_name = service_name.replace("-", "_")
×
512
    # handle service names which are reserved keywords in python (f.e. lambda)
UNCOV
513
    if is_keyword(service_name):
×
514
        service_name += "_"
×
515
    path = Path(base_path, service_name)
×
516

UNCOV
517
    if not path.exists():
×
518
        click.echo(f"creating directory {path}")
×
519
        path.mkdir()
×
520

UNCOV
521
    file = path / "__init__.py"
×
522
    click.echo(f"writing to file {file}")
×
523
    file.write_text(code)
×
524

525

526
@scaffold.command()
1✔
527
@click.option("--doc/--no-doc", default=False, help="whether or not to generate docstrings")
1✔
528
@click.option(
1✔
529
    "--path",
530
    default="./localstack-core/localstack/aws/api",
531
    help="the path in which to upgrade ASF APIs",
532
)
533
def upgrade(path: str, doc: bool = False):
1✔
534
    """
535
    Execute the code generation for all existing APIs.
536
    """
UNCOV
537
    services = [
×
538
        d.name.rstrip("_").replace("_", "-")
539
        for d in Path(path).iterdir()
540
        if d.is_dir() and not d.name.startswith("__")
541
    ]
542

UNCOV
543
    with Pool() as pool:
×
544
        pool.starmap(_do_generate_code, [(service, path, doc) for service in services])
×
545

UNCOV
546
    click.echo("done!")
×
547

548

549
def _do_generate_code(service: str, path: str, doc: bool):
1✔
UNCOV
550
    try:
×
551
        code = generate_code(service, doc)
×
552
    except UnknownServiceError:
×
553
        click.echo(f"unknown service {service}! skipping...")
×
554
        return
×
555
    create_code_directory(service, code, base_path=path)
×
556

557

558
if __name__ == "__main__":
559
    scaffold()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc