• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / b810cb22-0258-4411-b264-2fe3c023cb59

12 Mar 2025 08:50PM UTC coverage: 86.944% (+0.03%) from 86.915%
b810cb22-0258-4411-b264-2fe3c023cb59

push

circleci

web-flow
fix APIGW binary media types (#12371)

2 of 2 new or added lines in 1 file covered. (100.0%)

45 existing lines in 8 files now uncovered.

62242 of 71589 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.1
/localstack-core/localstack/services/stepfunctions/asl/parse/test_state/preprocessor.py
1
import enum
1✔
2
from typing import Final
1✔
3

4
from localstack.services.stepfunctions.asl.antlr.runtime.ASLParser import ASLParser
1✔
5
from localstack.services.stepfunctions.asl.component.common.parargs import Parameters
1✔
6
from localstack.services.stepfunctions.asl.component.common.path.input_path import InputPath
1✔
7
from localstack.services.stepfunctions.asl.component.common.path.result_path import ResultPath
1✔
8
from localstack.services.stepfunctions.asl.component.common.query_language import QueryLanguage
1✔
9
from localstack.services.stepfunctions.asl.component.common.result_selector import ResultSelector
1✔
10
from localstack.services.stepfunctions.asl.component.state.state import CommonStateField
1✔
11
from localstack.services.stepfunctions.asl.component.state.state_choice.state_choice import (
1✔
12
    StateChoice,
13
)
14
from localstack.services.stepfunctions.asl.component.state.state_execution.execute_state import (
1✔
15
    ExecutionState,
16
)
17
from localstack.services.stepfunctions.asl.component.state.state_pass.result import Result
1✔
18
from localstack.services.stepfunctions.asl.component.test_state.program.test_state_program import (
1✔
19
    TestStateProgram,
20
)
21
from localstack.services.stepfunctions.asl.component.test_state.state.test_state_state_props import (
1✔
22
    TestStateStateProps,
23
)
24
from localstack.services.stepfunctions.asl.eval.test_state.environment import TestStateEnvironment
1✔
25
from localstack.services.stepfunctions.asl.parse.preprocessor import Preprocessor
1✔
26
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
27

28

29
class InspectionDataKey(enum.Enum):
1✔
30
    INPUT = "input"
1✔
31
    AFTER_INPUT_PATH = "afterInputPath"
1✔
32
    AFTER_PARAMETERS = "afterParameters"
1✔
33
    RESULT = "result"
1✔
34
    AFTER_RESULT_SELECTOR = "afterResultSelector"
1✔
35
    AFTER_RESULT_PATH = "afterResultPath"
1✔
36
    REQUEST = "request"
1✔
37
    RESPONSE = "response"
1✔
38

39

40
def _decorated_updated_choice_inspection_data(method):
1✔
41
    def wrapper(env: TestStateEnvironment, *args, **kwargs):
1✔
42
        method(env, *args, **kwargs)
1✔
43
        env.set_choice_selected(env.next_state_name)
1✔
44

45
    return wrapper
1✔
46

47

48
def _decorated_updates_inspection_data(method, inspection_data_key: InspectionDataKey):
1✔
49
    def wrapper(env: TestStateEnvironment, *args, **kwargs):
1✔
50
        method(env, *args, **kwargs)
1✔
51
        result = to_json_str(env.stack[-1])
1✔
52
        # We know that the enum value used here corresponds to a supported inspection data field by design.
53
        env.inspection_data[inspection_data_key.value] = result  # noqa
1✔
54

55
    return wrapper
1✔
56

57

58
def _decorate_state_field(state_field: CommonStateField) -> None:
1✔
59
    if isinstance(state_field, ExecutionState):
1✔
60
        state_field._eval_execution = _decorated_updates_inspection_data(
1✔
61
            # As part of the decoration process, we intentionally access this protected member
62
            # to facilitate the decorator's functionality.
63
            method=state_field._eval_execution,  # noqa
64
            inspection_data_key=InspectionDataKey.RESULT,
65
        )
66
    elif isinstance(state_field, StateChoice):
1✔
67
        state_field._eval_body = _decorated_updated_choice_inspection_data(
1✔
68
            # As part of the decoration process, we intentionally access this protected member
69
            # to facilitate the decorator's functionality.
70
            method=state_field._eval_body  # noqa
71
        )
72

73

74
class TestStatePreprocessor(Preprocessor):
1✔
75
    STATE_NAME: Final[str] = "TestState"
1✔
76

77
    def visitState_decl_body(self, ctx: ASLParser.State_decl_bodyContext) -> TestStateProgram:
1✔
78
        self._open_query_language_scope(ctx)
1✔
79
        state_props = TestStateStateProps()
1✔
80
        state_props.name = self.STATE_NAME
1✔
81
        for child in ctx.children:
1✔
82
            cmp = self.visit(child)
1✔
83
            state_props.add(cmp)
1✔
84
        state_field = self._common_state_field_of(state_props=state_props)
1✔
85
        if state_props.get(QueryLanguage) is None:
1✔
86
            state_props.add(self._get_current_query_language())
1✔
87
        _decorate_state_field(state_field)
1✔
88
        self._close_query_language_scope()
1✔
89
        return TestStateProgram(state_field)
1✔
90

91
    def visitInput_path_decl(self, ctx: ASLParser.Input_path_declContext) -> InputPath:
1✔
92
        input_path: InputPath = super().visitInput_path_decl(ctx=ctx)
1✔
93
        input_path._eval_body = _decorated_updates_inspection_data(
1✔
94
            method=input_path._eval_body,  # noqa
95
            inspection_data_key=InspectionDataKey.AFTER_INPUT_PATH,
96
        )
97
        return input_path
1✔
98

99
    def visitParameters_decl(self, ctx: ASLParser.Parameters_declContext) -> Parameters:
1✔
100
        parameters: Parameters = super().visitParameters_decl(ctx=ctx)
1✔
101
        parameters._eval_body = _decorated_updates_inspection_data(
1✔
102
            method=parameters._eval_body,  # noqa
103
            inspection_data_key=InspectionDataKey.AFTER_PARAMETERS,
104
        )
105
        return parameters
1✔
106

107
    def visitResult_selector_decl(
1✔
108
        self, ctx: ASLParser.Result_selector_declContext
109
    ) -> ResultSelector:
110
        result_selector: ResultSelector = super().visitResult_selector_decl(ctx=ctx)
×
UNCOV
111
        result_selector._eval_body = _decorated_updates_inspection_data(
×
112
            method=result_selector._eval_body,  # noqa
113
            inspection_data_key=InspectionDataKey.AFTER_RESULT_SELECTOR,
114
        )
UNCOV
115
        return result_selector
×
116

117
    def visitResult_path_decl(self, ctx: ASLParser.Result_path_declContext) -> ResultPath:
1✔
118
        result_path: ResultPath = super().visitResult_path_decl(ctx=ctx)
1✔
119
        result_path._eval_body = _decorated_updates_inspection_data(
1✔
120
            method=result_path._eval_body,  # noqa
121
            inspection_data_key=InspectionDataKey.AFTER_RESULT_PATH,
122
        )
123
        return result_path
1✔
124

125
    def visitResult_decl(self, ctx: ASLParser.Result_declContext) -> Result:
1✔
126
        result: Result = super().visitResult_decl(ctx=ctx)
1✔
127
        result._eval_body = _decorated_updates_inspection_data(
1✔
128
            method=result._eval_body,
129
            inspection_data_key=InspectionDataKey.RESULT,  # noqa
130
        )
131
        return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc