• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19218862132

07 Nov 2025 04:44PM UTC coverage: 86.895% (+0.01%) from 86.884%
19218862132

push

github

web-flow
Sns:v2 platform endpoint operations (#13327)

Co-authored-by: Benjamin Simon <benjh.simon@gmail.com>

83 of 88 new or added lines in 3 files covered. (94.32%)

29 existing lines in 1 file now uncovered.

68575 of 78917 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.52
/localstack-core/localstack/aws/scaffold.py
1
import io
1✔
2
import keyword
1✔
3
import re
1✔
4
from collections import OrderedDict
1✔
5
from functools import cached_property
1✔
6
from multiprocessing import Pool
1✔
7
from pathlib import Path
1✔
8

9
import click
1✔
10
from botocore import xform_name
1✔
11
from botocore.exceptions import UnknownServiceError
1✔
12
from botocore.model import (
1✔
13
    ListShape,
14
    MapShape,
15
    OperationModel,
16
    ServiceModel,
17
    Shape,
18
    StringShape,
19
    StructureShape,
20
)
21

22
from localstack.aws.spec import load_service
1✔
23
from localstack.utils.common import camel_to_snake_case, snake_to_camel_case
1✔
24

25
# Some minification packages might treat "type" as a keyword, some specs define shapes called like the type "Optional"
26
KEYWORDS = list(keyword.kwlist) + ["type", "Optional", "Union"]
1✔
27
is_keyword = KEYWORDS.__contains__
1✔
28

29

30
def is_bad_param_name(name: str) -> bool:
1✔
31
    if name == "context":
1✔
32
        return True
1✔
33

34
    if is_keyword(name):
1✔
35
        return True
1✔
36

37
    return False
1✔
38

39

40
def to_valid_python_name(spec_name: str) -> str:
1✔
41
    sanitized = re.sub(r"[^0-9a-zA-Z_]+", "_", spec_name)
1✔
42

43
    if sanitized[0].isnumeric():
1✔
44
        sanitized = "i_" + sanitized
1✔
45

46
    if is_keyword(sanitized):
1✔
47
        sanitized += "_"
1✔
48

49
    if sanitized.startswith("__"):
1✔
50
        sanitized = sanitized[1:]
1✔
51

52
    return sanitized
1✔
53

54

55
def html_to_rst(html: str):
1✔
56
    import pypandoc
×
57

58
    doc = pypandoc.convert_text(html, "rst", format="html")
×
59
    doc = doc.replace("\_", "_")  # noqa: W605
×
60
    doc = doc.replace("\|", "|")  # noqa: W605
×
61
    doc = doc.replace("\ ", " ")  # noqa: W605
×
62
    doc = doc.replace("\\", "\\\\")  # noqa: W605
×
63
    rst = doc.strip()
×
64
    return rst
×
65

66

67
class ShapeNode:
1✔
68
    service: ServiceModel
1✔
69
    shape: Shape
1✔
70

71
    def __init__(self, service: ServiceModel, shape: Shape) -> None:
1✔
72
        super().__init__()
1✔
73
        self.service = service
1✔
74
        self.shape = shape
1✔
75

76
    @cached_property
1✔
77
    def request_operation(self) -> OperationModel | None:
1✔
78
        for operation_name in self.service.operation_names:
1✔
79
            operation = self.service.operation_model(operation_name)
1✔
80
            if operation.input_shape is None:
1✔
81
                continue
1✔
82

83
            if to_valid_python_name(self.shape.name) == to_valid_python_name(
1✔
84
                operation.input_shape.name
85
            ):
86
                return operation
1✔
87

88
        return None
1✔
89

90
    @cached_property
1✔
91
    def response_operation(self) -> OperationModel | None:
1✔
92
        for operation_name in self.service.operation_names:
1✔
93
            operation = self.service.operation_model(operation_name)
1✔
94
            if operation.output_shape is None:
1✔
95
                continue
1✔
96

97
            if to_valid_python_name(self.shape.name) == to_valid_python_name(
1✔
98
                operation.output_shape.name
99
            ):
100
                return operation
1✔
101

102
        return None
1✔
103

104
    @cached_property
1✔
105
    def is_request(self):
1✔
106
        return self.request_operation is not None
1✔
107

108
    @cached_property
1✔
109
    def is_response(self):
1✔
110
        return self.response_operation is not None
1✔
111

112
    @property
1✔
113
    def name(self) -> str:
1✔
114
        return to_valid_python_name(self.shape.name)
×
115

116
    @cached_property
1✔
117
    def is_exception(self):
1✔
118
        metadata = self.shape.metadata
1✔
119
        return metadata.get("error") or metadata.get("exception")
1✔
120

121
    @property
1✔
122
    def is_primitive(self):
1✔
123
        return self.shape.type_name in ["integer", "boolean", "float", "double", "string"]
1✔
124

125
    @property
1✔
126
    def is_enum(self):
1✔
127
        return isinstance(self.shape, StringShape) and self.shape.enum
1✔
128

129
    @property
1✔
130
    def dependencies(self) -> list[str]:
1✔
131
        shape = self.shape
1✔
132

133
        if isinstance(shape, StructureShape):
1✔
134
            return [to_valid_python_name(v.name) for v in shape.members.values()]
1✔
135
        if isinstance(shape, ListShape):
1✔
136
            return [to_valid_python_name(shape.member.name)]
1✔
137
        if isinstance(shape, MapShape):
1✔
138
            return [to_valid_python_name(shape.key.name), to_valid_python_name(shape.value.name)]
1✔
139

140
        return []
1✔
141

142
    def _print_structure_declaration(self, output, doc=True, quote_types=False):
1✔
143
        if self.is_exception:
1✔
144
            self._print_as_class(output, "ServiceException", doc, quote_types)
1✔
145
            return
1✔
146

147
        if any(map(is_keyword, self.shape.members.keys())):
1✔
148
            self._print_as_typed_dict(output, doc, quote_types)
1✔
149
            return
1✔
150

151
        if self.is_request:
1✔
152
            base = "ServiceRequest"
1✔
153
        else:
154
            base = "TypedDict, total=False"
1✔
155

156
        self._print_as_class(output, base, doc, quote_types)
1✔
157

158
    def _print_as_class(self, output, base: str, doc=True, quote_types=False):
1✔
159
        output.write(f"class {to_valid_python_name(self.shape.name)}({base}):\n")
1✔
160

161
        q = '"' if quote_types else ""
1✔
162

163
        if doc:
1✔
164
            self.print_shape_doc(output, self.shape)
×
165

166
        if self.is_exception:
1✔
167
            error_spec = self.shape.metadata.get("error", {})
1✔
168
            output.write(f'    code: str = "{error_spec.get("code", self.shape.name)}"\n')
1✔
169
            output.write(f"    sender_fault: bool = {error_spec.get('senderFault', False)}\n")
1✔
170
            output.write(f"    status_code: int = {error_spec.get('httpStatusCode', 400)}\n")
1✔
171
        elif not self.shape.members:
1✔
172
            output.write("    pass\n")
1✔
173

174
        # Avoid generating members for the common error members:
175
        # - The message will always be the exception message (first argument of the exception class init)
176
        # - The code is already set above
177
        # - The type is the sender_fault which is already set above
178
        remaining_members = {
1✔
179
            k: v
180
            for k, v in self.shape.members.items()
181
            if not self.is_exception or k.lower() not in ["message", "code"]
182
        }
183

184
        # render any streaming payload first
185
        if self.is_request and self.request_operation.has_streaming_input:
1✔
186
            member: str = self.request_operation.input_shape.serialization.get("payload")
1✔
187
            shape: Shape = self.request_operation.get_streaming_input()
1✔
188
            if member in self.shape.required_members:
1✔
189
                output.write(f"    {member}: IO[{q}{to_valid_python_name(shape.name)}{q}]\n")
1✔
190
            else:
191
                output.write(f"    {member}: {q}IO[{to_valid_python_name(shape.name)}] | None{q}\n")
1✔
192
            del remaining_members[member]
1✔
193
        # render the streaming payload first
194
        if self.is_response and self.response_operation.has_streaming_output:
1✔
195
            member: str = self.response_operation.output_shape.serialization.get("payload")
1✔
196
            shape: Shape = self.response_operation.get_streaming_output()
1✔
197
            shape_name = to_valid_python_name(shape.name)
1✔
198
            if member in self.shape.required_members:
1✔
UNCOV
199
                output.write(
×
200
                    f"    {member}: {q}{shape_name} | IO[{shape_name}] | Iterable[{shape_name}]{q}\n"
201
                )
202
            else:
203
                output.write(
1✔
204
                    f"    {member}: {q}{shape_name} | IO[{shape_name}] | Iterable[{shape_name}] | None{q}\n"
205
                )
206
            del remaining_members[member]
1✔
207

208
        for k, v in remaining_members.items():
1✔
209
            shape_name = to_valid_python_name(v.name)
1✔
210
            if k in self.shape.required_members:
1✔
211
                if v.serialization.get("eventstream"):
1✔
212
                    output.write(f"    {k}: Iterator[{q}{shape_name}{q}]\n")
1✔
213
                else:
214
                    output.write(f"    {k}: {q}{shape_name}{q}\n")
1✔
215
            else:
216
                if v.serialization.get("eventstream"):
1✔
217
                    output.write(f"    {k}: Iterator[{q}{shape_name}{q}]\n")
1✔
218
                else:
219
                    output.write(f"    {k}: {q}{shape_name} | None{q}\n")
1✔
220

221
    def _print_as_typed_dict(self, output, doc=True, quote_types=False):
1✔
222
        name = to_valid_python_name(self.shape.name)
1✔
223
        output.write(f'{name} = TypedDict("{name}", {{\n')
1✔
224
        for k, v in self.shape.members.items():
1✔
225
            member_name = to_valid_python_name(v.name)
1✔
226
            # check if the member name is the same as the type name (recursive types need to use forward references)
227
            recursive_type = name == member_name
1✔
228
            q = '"' if quote_types or recursive_type else ""
1✔
229
            if k in self.shape.required_members:
1✔
230
                if v.serialization.get("eventstream"):
1✔
UNCOV
231
                    output.write(f'    "{k}": Iterator[{q}{member_name}{q}],\n')
×
232
                else:
233
                    output.write(f'    "{k}": {q}{member_name}{q},\n')
1✔
234
            else:
235
                if v.serialization.get("eventstream"):
1✔
UNCOV
236
                    output.write(f'    "{k}": Iterator[{q}{member_name}{q}],\n')
×
237
                else:
238
                    output.write(f'    "{k}": {q}{member_name} | None{q},\n')
1✔
239
        output.write("}, total=False)")
1✔
240

241
    def print_shape_doc(self, output, shape):
1✔
UNCOV
242
        html = shape.documentation
×
243
        rst = html_to_rst(html)
×
244
        if rst:
×
245
            output.write('    """')
×
246
            output.write(f"{rst}\n")
×
247
            output.write('    """\n')
×
248

249
    def print_declaration(self, output, doc=True, quote_types=False):
1✔
250
        shape = self.shape
1✔
251

252
        q = '"' if quote_types else ""
1✔
253

254
        if isinstance(shape, StructureShape):
1✔
255
            self._print_structure_declaration(output, doc, quote_types)
1✔
256
        elif isinstance(shape, ListShape):
1✔
257
            output.write(
1✔
258
                f"{to_valid_python_name(shape.name)} = list[{q}{to_valid_python_name(shape.member.name)}{q}]"
259
            )
260
        elif isinstance(shape, MapShape):
1✔
261
            output.write(
1✔
262
                f"{to_valid_python_name(shape.name)} = dict[{q}{to_valid_python_name(shape.key.name)}{q}, {q}{to_valid_python_name(shape.value.name)}{q}]"
263
            )
264
        elif isinstance(shape, StringShape):
1✔
265
            if shape.enum:
1✔
266
                output.write(f"class {to_valid_python_name(shape.name)}(StrEnum):\n")
1✔
267
                for value in shape.enum:
1✔
268
                    name = to_valid_python_name(value)
1✔
269
                    output.write(f'    {name} = "{value}"\n')
1✔
270
            else:
271
                output.write(f"{to_valid_python_name(shape.name)} = str")
1✔
272
        elif shape.type_name == "string":
1✔
UNCOV
273
            output.write(f"{to_valid_python_name(shape.name)} = str")
×
274
        elif shape.type_name == "integer":
1✔
275
            output.write(f"{to_valid_python_name(shape.name)} = int")
1✔
276
        elif shape.type_name == "long":
1✔
277
            output.write(f"{to_valid_python_name(shape.name)} = int")
1✔
278
        elif shape.type_name == "double":
1✔
279
            output.write(f"{to_valid_python_name(shape.name)} = float")
1✔
280
        elif shape.type_name == "float":
1✔
UNCOV
281
            output.write(f"{to_valid_python_name(shape.name)} = float")
×
282
        elif shape.type_name == "boolean":
1✔
283
            output.write(f"{to_valid_python_name(shape.name)} = bool")
1✔
284
        elif shape.type_name == "blob":
1✔
285
            # blobs are often associated with streaming payloads, but we handle that on operation level,
286
            # not on shape level
287
            output.write(f"{to_valid_python_name(shape.name)} = bytes")
1✔
288
        elif shape.type_name == "timestamp":
1✔
289
            output.write(f"{to_valid_python_name(shape.name)} = datetime")
1✔
290
        else:
UNCOV
291
            output.write(
×
292
                f"# unknown shape type for {to_valid_python_name(shape.name)}: {shape.type_name}"
293
            )
294
        # TODO: BoxedInteger?
295

296
        output.write("\n")
1✔
297

298
    def get_order(self):
1✔
299
        """
300
        Defines a basic order in which to sort the stack of shape nodes before printing.
301
        First all non-enum primitives are printed, then enums, then exceptions, then all other types.
302
        """
303
        if self.is_primitive:
1✔
304
            if self.is_enum:
1✔
305
                return 1
1✔
306
            else:
307
                return 0
1✔
308

309
        if self.is_exception:
1✔
310
            return 2
1✔
311

312
        return 3
1✔
313

314

315
def generate_service_types(output, service: ServiceModel, doc=True):
1✔
316
    output.write("from datetime import datetime\n")
1✔
317
    output.write("from enum import StrEnum\n")
1✔
318
    output.write("from typing import IO, TypedDict\n")
1✔
319
    output.write("from collections.abc import Iterable, Iterator\n")
1✔
320
    output.write("\n")
1✔
321
    output.write(
1✔
322
        "from localstack.aws.api import handler, RequestContext, ServiceException, ServiceRequest"
323
    )
324
    output.write("\n")
1✔
325

326
    # ==================================== print type declarations
327
    nodes: dict[str, ShapeNode] = {}
1✔
328

329
    for shape_name in service.shape_names:
1✔
330
        shape = service.shape_for(shape_name)
1✔
331
        nodes[to_valid_python_name(shape_name)] = ShapeNode(service, shape)
1✔
332

333
    # output.write("__all__ = [\n")
334
    # for name in nodes.keys():
335
    #     output.write(f'    "{name}",\n')
336
    # output.write("]\n")
337

338
    printed: set[str] = set()
1✔
339
    visited: set[str] = set()
1✔
340
    stack: list[str] = list(nodes.keys())
1✔
341

342
    stack = sorted(stack, key=lambda name: nodes[name].get_order())
1✔
343
    stack.reverse()
1✔
344

345
    while stack:
1✔
346
        name = stack.pop()
1✔
347
        if name in printed:
1✔
348
            continue
1✔
349
        node = nodes[name]
1✔
350

351
        dependencies = [dep for dep in node.dependencies if dep not in printed]
1✔
352

353
        if not dependencies:
1✔
354
            node.print_declaration(output, doc=doc)
1✔
355
            printed.add(name)
1✔
356
        elif name in visited:
1✔
357
            # break out of circular dependencies
358
            node.print_declaration(output, doc=doc, quote_types=True)
1✔
359
            printed.add(name)
1✔
360
        else:
361
            stack.append(name)
1✔
362
            stack.extend(dependencies)
1✔
363
            visited.add(name)
1✔
364

365

366
def generate_service_api(output, service: ServiceModel, doc=True):
1✔
367
    service_name = service.service_name.replace("-", "_")
1✔
368
    class_name = service_name + "_api"
1✔
369
    class_name = snake_to_camel_case(class_name)
1✔
370

371
    output.write(f"class {class_name}:\n")
1✔
372
    output.write("\n")
1✔
373
    output.write(f'    service: str = "{service.service_name}"\n')
1✔
374
    output.write(f'    version: str = "{service.api_version}"\n')
1✔
375
    for op_name in service.operation_names:
1✔
376
        operation: OperationModel = service.operation_model(op_name)
1✔
377

378
        fn_name = camel_to_snake_case(op_name)
1✔
379

380
        if operation.output_shape:
1✔
381
            output_shape = to_valid_python_name(operation.output_shape.name)
1✔
382
        else:
383
            output_shape = "None"
1✔
384

385
        output.write("\n")
1✔
386
        parameters = OrderedDict()
1✔
387
        param_shapes = OrderedDict()
1✔
388

389
        if input_shape := operation.input_shape:
1✔
390
            members = list(input_shape.members)
1✔
391

392
            streaming_payload_member = None
1✔
393
            if operation.has_streaming_input:
1✔
394
                streaming_payload_member = operation.input_shape.serialization.get("payload")
1✔
395

396
            for m in input_shape.required_members:
1✔
397
                members.remove(m)
1✔
398
                m_shape = input_shape.members[m]
1✔
399
                type_name = to_valid_python_name(m_shape.name)
1✔
400
                if m == streaming_payload_member:
1✔
401
                    type_name = f"IO[{type_name}]"
1✔
402
                parameters[xform_name(m)] = type_name
1✔
403
                param_shapes[xform_name(m)] = m_shape
1✔
404

405
            for m in members:
1✔
406
                m_shape = input_shape.members[m]
1✔
407
                param_shapes[xform_name(m)] = m_shape
1✔
408
                type_name = to_valid_python_name(m_shape.name)
1✔
409
                if m == streaming_payload_member:
1✔
410
                    type_name = f"IO[{type_name}]"
1✔
411
                parameters[xform_name(m)] = f"{type_name} | None = None"
1✔
412

413
        if any(map(is_bad_param_name, parameters.keys())):
1✔
414
            # if we cannot render the parameter name, don't expand the parameters in the handler
415
            param_list = f"request: {to_valid_python_name(input_shape.name)}" if input_shape else ""
1✔
416
            output.write(f'    @handler("{operation.name}", expand=False)\n')
1✔
417
        else:
418
            param_list = ", ".join([f"{k}: {v}" for k, v in parameters.items()])
1✔
419
            output.write(f'    @handler("{operation.name}")\n')
1✔
420

421
        # add the **kwargs in the end
422
        if param_list:
1✔
423
            param_list += ", **kwargs"
1✔
424
        else:
425
            param_list = "**kwargs"
1✔
426

427
        output.write(
1✔
428
            f"    def {fn_name}(self, context: RequestContext, {param_list}) -> {output_shape}:\n"
429
        )
430

431
        # convert html documentation to rst and print it into to the signature
432
        if doc:
1✔
UNCOV
433
            html = operation.documentation
×
UNCOV
434
            rst = html_to_rst(html)
×
435
            output.write('        """')
×
436
            output.write(f"{rst}\n")
×
437
            output.write("\n")
×
438

439
            # parameters
UNCOV
440
            for param_name, shape in param_shapes.items():
×
441
                # FIXME: this doesn't work properly
442
                rst = html_to_rst(shape.documentation)
×
UNCOV
443
                rst = rst.strip().split(".")[0] + "."
×
444
                output.write(f":param {param_name}: {rst}\n")
×
445

446
            # return value
UNCOV
447
            if operation.output_shape:
×
UNCOV
448
                output.write(f":returns: {to_valid_python_name(operation.output_shape.name)}\n")
×
449

450
            # errors
UNCOV
451
            for error in operation.error_shapes:
×
UNCOV
452
                output.write(f":raises {to_valid_python_name(error.name)}:\n")
×
453

454
            output.write('        """\n')
×
455

456
        output.write("        raise NotImplementedError\n")
457

458

459
@click.group()
1✔
460
def scaffold():
1✔
UNCOV
461
    pass
×
462

463

464
@scaffold.command(name="generate")
1✔
465
@click.argument("service", type=str)
1✔
466
@click.option("--doc/--no-doc", default=False, help="whether or not to generate docstrings")
1✔
467
@click.option(
1✔
468
    "--save/--print",
469
    default=False,
470
    help="whether or not to save the result into the api directory",
471
)
472
@click.option(
1✔
473
    "--path",
474
    default="./localstack-core/localstack/aws/api",
475
    help="the path where the api should be saved",
476
)
477
def generate(service: str, doc: bool, save: bool, path: str):
1✔
478
    """
479
    Generate types and API stubs for a given AWS service.
480

481
    SERVICE is the service to generate the stubs for (e.g., sqs, or cloudformation)
482
    """
483
    from click import ClickException
1✔
484

485
    try:
1✔
486
        code = generate_code(service, doc=doc)
1✔
UNCOV
487
    except UnknownServiceError:
×
UNCOV
488
        raise ClickException(f"unknown service {service}")
×
489

490
    if not save:
1✔
491
        # either just print the code to stdout
492
        click.echo(code)
1✔
493
        return
1✔
494

495
    # or find the file path and write the code to that location
UNCOV
496
    create_code_directory(service, code, path)
×
UNCOV
497
    click.echo("done!")
×
498

499

500
def generate_code(service_name: str, doc: bool = False) -> str:
1✔
501
    model = load_service(service_name)
1✔
502
    output = io.StringIO()
1✔
503
    generate_service_types(output, model, doc=doc)
1✔
504
    generate_service_api(output, model, doc=doc)
1✔
505
    return output.getvalue()
1✔
506

507

508
def create_code_directory(service_name: str, code: str, base_path: str):
1✔
UNCOV
509
    service_name = service_name.replace("-", "_")
×
510
    # handle service names which are reserved keywords in python (f.e. lambda)
511
    if is_keyword(service_name):
×
UNCOV
512
        service_name += "_"
×
513
    path = Path(base_path, service_name)
×
514

515
    if not path.exists():
×
UNCOV
516
        click.echo(f"creating directory {path}")
×
517
        path.mkdir()
×
518

519
    file = path / "__init__.py"
×
UNCOV
520
    click.echo(f"writing to file {file}")
×
521
    file.write_text(code)
×
522

523

524
@scaffold.command()
1✔
525
@click.option("--doc/--no-doc", default=False, help="whether or not to generate docstrings")
1✔
526
@click.option(
1✔
527
    "--path",
528
    default="./localstack-core/localstack/aws/api",
529
    help="the path in which to upgrade ASF APIs",
530
)
531
def upgrade(path: str, doc: bool = False):
1✔
532
    """
533
    Execute the code generation for all existing APIs.
534
    """
UNCOV
535
    services = [
×
536
        d.name.rstrip("_").replace("_", "-")
537
        for d in Path(path).iterdir()
538
        if d.is_dir() and not d.name.startswith("__")
539
    ]
540

UNCOV
541
    with Pool() as pool:
×
UNCOV
542
        pool.starmap(_do_generate_code, [(service, path, doc) for service in services])
×
543

544
    click.echo("done!")
×
545

546

547
def _do_generate_code(service: str, path: str, doc: bool):
1✔
UNCOV
548
    try:
×
UNCOV
549
        code = generate_code(service, doc)
×
550
    except UnknownServiceError:
×
551
        click.echo(f"unknown service {service}! skipping...")
×
552
        return
×
553
    create_code_directory(service, code, base_path=path)
×
554

555

556
if __name__ == "__main__":
557
    scaffold()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc