• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19915770841

04 Dec 2025 12:59AM UTC coverage: 86.897% (-0.008%) from 86.905%
19915770841

push

github

web-flow
CFn: configure wait time for polling resources (#13455)

19 of 23 new or added lines in 4 files covered. (82.61%)

93 existing lines in 4 files now uncovered.

69758 of 80277 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.67
/localstack-core/localstack/services/stepfunctions/provider.py
1
import copy
1✔
2
import datetime
1✔
3
import json
1✔
4
import logging
1✔
5
import re
1✔
6
import time
1✔
7
from typing import Final
1✔
8

9
from localstack.aws.api import CommonServiceException, RequestContext
1✔
10
from localstack.aws.api.stepfunctions import (
1✔
11
    ActivityDoesNotExist,
12
    AliasDescription,
13
    Arn,
14
    CharacterRestrictedName,
15
    ConflictException,
16
    CreateActivityOutput,
17
    CreateStateMachineAliasOutput,
18
    CreateStateMachineInput,
19
    CreateStateMachineOutput,
20
    Definition,
21
    DeleteActivityOutput,
22
    DeleteStateMachineAliasOutput,
23
    DeleteStateMachineOutput,
24
    DeleteStateMachineVersionOutput,
25
    DescribeActivityOutput,
26
    DescribeExecutionOutput,
27
    DescribeMapRunOutput,
28
    DescribeStateMachineAliasOutput,
29
    DescribeStateMachineForExecutionOutput,
30
    DescribeStateMachineOutput,
31
    EncryptionConfiguration,
32
    ExecutionDoesNotExist,
33
    ExecutionList,
34
    ExecutionRedriveFilter,
35
    ExecutionStatus,
36
    GetActivityTaskOutput,
37
    GetExecutionHistoryOutput,
38
    IncludedData,
39
    IncludeExecutionDataGetExecutionHistory,
40
    InspectionLevel,
41
    InvalidArn,
42
    InvalidDefinition,
43
    InvalidExecutionInput,
44
    InvalidLoggingConfiguration,
45
    InvalidName,
46
    InvalidToken,
47
    ListActivitiesOutput,
48
    ListExecutionsOutput,
49
    ListExecutionsPageToken,
50
    ListMapRunsOutput,
51
    ListStateMachineAliasesOutput,
52
    ListStateMachinesOutput,
53
    ListStateMachineVersionsOutput,
54
    ListTagsForResourceOutput,
55
    LoggingConfiguration,
56
    LogLevel,
57
    LongArn,
58
    MaxConcurrency,
59
    MissingRequiredParameter,
60
    MockInput,
61
    Name,
62
    PageSize,
63
    PageToken,
64
    Publish,
65
    PublishStateMachineVersionOutput,
66
    ResourceNotFound,
67
    ReverseOrder,
68
    RevisionId,
69
    RoutingConfigurationList,
70
    SendTaskFailureOutput,
71
    SendTaskHeartbeatOutput,
72
    SendTaskSuccessOutput,
73
    SensitiveCause,
74
    SensitiveData,
75
    SensitiveError,
76
    StartExecutionOutput,
77
    StartSyncExecutionOutput,
78
    StateMachineAliasList,
79
    StateMachineAlreadyExists,
80
    StateMachineDoesNotExist,
81
    StateMachineList,
82
    StateMachineType,
83
    StateMachineTypeNotSupported,
84
    StepfunctionsApi,
85
    StopExecutionOutput,
86
    TagKeyList,
87
    TagList,
88
    TagResourceOutput,
89
    TaskDoesNotExist,
90
    TaskTimedOut,
91
    TaskToken,
92
    TestStateInput,
93
    TestStateOutput,
94
    ToleratedFailureCount,
95
    ToleratedFailurePercentage,
96
    TraceHeader,
97
    TracingConfiguration,
98
    UntagResourceOutput,
99
    UpdateMapRunOutput,
100
    UpdateStateMachineAliasOutput,
101
    UpdateStateMachineOutput,
102
    ValidateStateMachineDefinitionDiagnostic,
103
    ValidateStateMachineDefinitionDiagnosticList,
104
    ValidateStateMachineDefinitionInput,
105
    ValidateStateMachineDefinitionOutput,
106
    ValidateStateMachineDefinitionResultCode,
107
    ValidateStateMachineDefinitionSeverity,
108
    ValidationException,
109
    VersionDescription,
110
)
111
from localstack.services.plugins import ServiceLifecycleHook
1✔
112
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.iteration.itemprocessor.map_run_record import (
1✔
113
    MapRunRecord,
114
)
115
from localstack.services.stepfunctions.asl.eval.callback.callback import (
1✔
116
    ActivityCallbackEndpoint,
117
    CallbackConsumerTimeout,
118
    CallbackNotifyConsumerError,
119
    CallbackOutcomeFailure,
120
    CallbackOutcomeSuccess,
121
)
122
from localstack.services.stepfunctions.asl.eval.event.logging import (
1✔
123
    CloudWatchLoggingConfiguration,
124
    CloudWatchLoggingSession,
125
)
126
from localstack.services.stepfunctions.asl.parse.asl_parser import (
1✔
127
    ASLParserException,
128
)
129
from localstack.services.stepfunctions.asl.static_analyser.express_static_analyser import (
1✔
130
    ExpressStaticAnalyser,
131
)
132
from localstack.services.stepfunctions.asl.static_analyser.static_analyser import (
1✔
133
    StaticAnalyser,
134
)
135
from localstack.services.stepfunctions.asl.static_analyser.test_state.test_state_analyser import (
1✔
136
    TestStateStaticAnalyser,
137
)
138
from localstack.services.stepfunctions.asl.static_analyser.usage_metrics_static_analyser import (
1✔
139
    UsageMetricsStaticAnalyser,
140
)
141
from localstack.services.stepfunctions.backend.activity import Activity, ActivityTask
1✔
142
from localstack.services.stepfunctions.backend.alias import Alias
1✔
143
from localstack.services.stepfunctions.backend.execution import Execution, SyncExecution
1✔
144
from localstack.services.stepfunctions.backend.state_machine import (
1✔
145
    StateMachineInstance,
146
    StateMachineRevision,
147
    StateMachineVersion,
148
    TestStateMachine,
149
)
150
from localstack.services.stepfunctions.backend.store import SFNStore, sfn_stores
1✔
151
from localstack.services.stepfunctions.backend.test_state.execution import (
1✔
152
    TestStateExecution,
153
)
154
from localstack.services.stepfunctions.backend.test_state.test_state_mock import TestStateMock
1✔
155
from localstack.services.stepfunctions.local_mocking.mock_config import (
1✔
156
    LocalMockTestCase,
157
    load_local_mock_test_case_for,
158
)
159
from localstack.services.stepfunctions.stepfunctions_utils import (
1✔
160
    assert_pagination_parameters_valid,
161
    get_next_page_token_from_arn,
162
    normalise_max_results,
163
)
164
from localstack.state import StateVisitor
1✔
165
from localstack.utils.aws import arns
1✔
166
from localstack.utils.aws.arns import (
1✔
167
    ARN_PARTITION_REGEX,
168
    stepfunctions_activity_arn,
169
    stepfunctions_express_execution_arn,
170
    stepfunctions_standard_execution_arn,
171
    stepfunctions_state_machine_arn,
172
)
173
from localstack.utils.collections import PaginatedList
1✔
174
from localstack.utils.strings import long_uid, short_uid
1✔
175

176
LOG = logging.getLogger(__name__)
1✔
177

178

179
class StepFunctionsProvider(StepfunctionsApi, ServiceLifecycleHook):
1✔
180
    _TEST_STATE_MAX_TIMEOUT_SECONDS: Final[int] = 300  # 5 minutes.
1✔
181

182
    @staticmethod
1✔
183
    def get_store(context: RequestContext) -> SFNStore:
1✔
184
        return sfn_stores[context.account_id][context.region]
1✔
185

186
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
187
        visitor.visit(sfn_stores)
×
188

189
    _STATE_MACHINE_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
190
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:stateMachine:[a-zA-Z0-9-_.]+(:\d+)?(:[a-zA-Z0-9-_.]+)*(?:#[a-zA-Z0-9-_]+)?$"
191
    )
192

193
    _STATE_MACHINE_EXECUTION_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
194
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:(stateMachine|execution|express):[a-zA-Z0-9-_.]+(:\d+)?(:[a-zA-Z0-9-_.]+)*$"
195
    )
196

197
    _ACTIVITY_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
198
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:activity:[a-zA-Z0-9-_\.]{{1,80}}$"
199
    )
200

201
    _ALIAS_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
202
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:stateMachine:[A-Za-z0-9_.-]+:[A-Za-z_.-]+[A-Za-z0-9_.-]{{0,80}}$"
203
    )
204

205
    _ALIAS_NAME_REGEX: Final[re.Pattern] = re.compile(r"^(?=.*[a-zA-Z_\-\.])[a-zA-Z0-9_\-\.]+$")
1✔
206

207
    @staticmethod
1✔
208
    def _validate_state_machine_arn(state_machine_arn: str) -> None:
1✔
209
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
210
        if not StepFunctionsProvider._STATE_MACHINE_ARN_REGEX.match(state_machine_arn):
1✔
211
            raise InvalidArn(f"Invalid arn: '{state_machine_arn}'")
1✔
212

213
    @staticmethod
1✔
214
    def _raise_state_machine_does_not_exist(state_machine_arn: str) -> None:
1✔
215
        raise StateMachineDoesNotExist(f"State Machine Does Not Exist: '{state_machine_arn}'")
1✔
216

217
    @staticmethod
1✔
218
    def _validate_state_machine_execution_arn(execution_arn: str) -> None:
1✔
219
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
220
        if not StepFunctionsProvider._STATE_MACHINE_EXECUTION_ARN_REGEX.match(execution_arn):
1✔
221
            raise InvalidArn(f"Invalid arn: '{execution_arn}'")
1✔
222

223
    @staticmethod
1✔
224
    def _validate_activity_arn(activity_arn: str) -> None:
1✔
225
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
226
        if not StepFunctionsProvider._ACTIVITY_ARN_REGEX.match(activity_arn):
1✔
227
            raise InvalidArn(f"Invalid arn: '{activity_arn}'")
1✔
228

229
    @staticmethod
1✔
230
    def _validate_state_machine_alias_arn(state_machine_alias_arn: Arn) -> None:
1✔
231
        if not StepFunctionsProvider._ALIAS_ARN_REGEX.match(state_machine_alias_arn):
1✔
UNCOV
232
            raise InvalidArn(f"Invalid arn: '{state_machine_alias_arn}'")
×
233

234
    def _raise_state_machine_type_not_supported(self):
1✔
235
        raise StateMachineTypeNotSupported(
1✔
236
            "This operation is not supported by this type of state machine"
237
        )
238

239
    @staticmethod
1✔
240
    def _raise_resource_type_not_in_context(resource_type: str) -> None:
1✔
241
        lower_resource_type = resource_type.lower()
1✔
242
        raise InvalidArn(
1✔
243
            f"Invalid Arn: 'Resource type not valid in this context: {lower_resource_type}'"
244
        )
245

246
    @staticmethod
1✔
247
    def _validate_test_state_mock_input(mock_input: MockInput) -> None:
1✔
248
        if {"result", "errorOutput"} <= mock_input.keys():
1✔
249
            # FIXME create proper error
UNCOV
250
            raise ValidationException("Cannot define both 'result' and 'errorOutput'")
×
251

252
    @staticmethod
1✔
253
    def _validate_activity_name(name: str) -> None:
1✔
254
        # The activity name is validated according to the AWS StepFunctions documentation, the name should not contain:
255
        # - white space
256
        # - brackets < > { } [ ]
257
        # - wildcard characters ? *
258
        # - special characters " # % \ ^ | ~ ` $ & , ; : /
259
        # - control characters (U+0000-001F, U+007F-009F)
260
        # https://docs.aws.amazon.com/step-functions/latest/apireference/API_CreateActivity.html#API_CreateActivity_RequestSyntax
261
        if not (1 <= len(name) <= 80):
1✔
UNCOV
262
            raise InvalidName(f"Invalid Name: '{name}'")
×
263
        invalid_chars = set(' <>{}[]?*"#%\\^|~`$&,;:/')
1✔
264
        control_chars = {chr(i) for i in range(32)} | {chr(i) for i in range(127, 160)}
1✔
265
        invalid_chars |= control_chars
1✔
266
        for char in name:
1✔
267
            if char in invalid_chars:
1✔
268
                raise InvalidName(f"Invalid Name: '{name}'")
1✔
269

270
    @staticmethod
1✔
271
    def _validate_state_machine_alias_name(name: CharacterRestrictedName) -> None:
1✔
272
        len_name = len(name)
1✔
273
        if len_name > 80:
1✔
274
            raise ValidationException(
1✔
275
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: "
276
                f"Member must have length less than or equal to 80"
277
            )
278
        if not StepFunctionsProvider._ALIAS_NAME_REGEX.match(name):
1✔
279
            raise ValidationException(
1✔
280
                # TODO: explore more error cases in which more than one validation error may occur which results
281
                #  in the counter below being greater than 1.
282
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: "
283
                f"Member must satisfy regular expression pattern: ^(?=.*[a-zA-Z_\\-\\.])[a-zA-Z0-9_\\-\\.]+$"
284
            )
285

286
    def _get_execution(self, context: RequestContext, execution_arn: Arn) -> Execution:
1✔
287
        execution: Execution | None = self.get_store(context).executions.get(execution_arn)
1✔
288
        if not execution:
1✔
289
            raise ExecutionDoesNotExist(f"Execution Does Not Exist: '{execution_arn}'")
1✔
290
        return execution
1✔
291

292
    def _get_executions(
1✔
293
        self,
294
        context: RequestContext,
295
        execution_status: ExecutionStatus | None = None,
296
    ):
297
        store = self.get_store(context)
1✔
298
        execution: list[Execution] = list(store.executions.values())
1✔
299
        if execution_status:
1✔
300
            execution = list(
1✔
301
                filter(
302
                    lambda e: e.exec_status == execution_status,
303
                    store.executions.values(),
304
                )
305
            )
306
        return execution
1✔
307

308
    def _get_activity(self, context: RequestContext, activity_arn: Arn) -> Activity:
1✔
309
        maybe_activity: Activity | None = self.get_store(context).activities.get(activity_arn, None)
1✔
310
        if maybe_activity is None:
1✔
311
            raise ActivityDoesNotExist(f"Activity Does Not Exist: '{activity_arn}'")
1✔
312
        return maybe_activity
1✔
313

314
    def _idempotent_revision(
1✔
315
        self,
316
        context: RequestContext,
317
        name: str,
318
        definition: Definition,
319
        state_machine_type: StateMachineType,
320
        logging_configuration: LoggingConfiguration,
321
        tracing_configuration: TracingConfiguration,
322
    ) -> StateMachineRevision | None:
323
        # CreateStateMachine's idempotency check is based on the state machine name, definition, type,
324
        # LoggingConfiguration and TracingConfiguration.
325
        # If a following request has a different roleArn or tags, Step Functions will ignore these differences and
326
        # treat it as an idempotent request of the previous. In this case, roleArn and tags will not be updated, even
327
        # if they are different.
328
        state_machines: list[StateMachineInstance] = list(
1✔
329
            self.get_store(context).state_machines.values()
330
        )
331
        revisions = filter(lambda sm: isinstance(sm, StateMachineRevision), state_machines)
1✔
332
        for state_machine in revisions:
1✔
333
            check = all(
1✔
334
                [
335
                    state_machine.name == name,
336
                    state_machine.definition == definition,
337
                    state_machine.sm_type == state_machine_type,
338
                    state_machine.logging_config == logging_configuration,
339
                    state_machine.tracing_config == tracing_configuration,
340
                ]
341
            )
342
            if check:
1✔
343
                return state_machine
1✔
344
        return None
1✔
345

346
    def _idempotent_start_execution(
1✔
347
        self,
348
        execution: Execution | None,
349
        state_machine: StateMachineInstance,
350
        name: Name,
351
        input_data: SensitiveData,
352
    ) -> Execution | None:
353
        # StartExecution is idempotent for STANDARD workflows. For a STANDARD workflow,
354
        # if you call StartExecution with the same name and input as a running execution,
355
        # the call succeeds and return the same response as the original request.
356
        # If the execution is closed or if the input is different,
357
        # it returns a 400 ExecutionAlreadyExists error. You can reuse names after 90 days.
358

359
        if not execution:
1✔
UNCOV
360
            return None
×
361

362
        match (name, input_data, execution.exec_status, state_machine.sm_type):
1✔
363
            case (
1✔
364
                execution.name,
365
                execution.input_data,
366
                ExecutionStatus.RUNNING,
367
                StateMachineType.STANDARD,
368
            ):
369
                return execution
1✔
370

371
        raise CommonServiceException(
1✔
372
            code="ExecutionAlreadyExists",
373
            message=f"Execution Already Exists: '{execution.exec_arn}'",
374
        )
375

376
    def _revision_by_name(self, context: RequestContext, name: str) -> StateMachineInstance | None:
1✔
377
        state_machines: list[StateMachineInstance] = list(
1✔
378
            self.get_store(context).state_machines.values()
379
        )
380
        for state_machine in state_machines:
1✔
381
            if isinstance(state_machine, StateMachineRevision) and state_machine.name == name:
1✔
382
                return state_machine
1✔
383
        return None
1✔
384

385
    @staticmethod
1✔
386
    def _validate_definition(definition: str, static_analysers: list[StaticAnalyser]) -> None:
1✔
387
        try:
1✔
388
            for static_analyser in static_analysers:
1✔
389
                static_analyser.analyse(definition)
1✔
390
        except ASLParserException as asl_parser_exception:
1✔
391
            invalid_definition = InvalidDefinition()
1✔
392
            invalid_definition.message = repr(asl_parser_exception)
1✔
393
            raise invalid_definition
1✔
394
        except Exception as exception:
1✔
395
            exception_name = exception.__class__.__name__
1✔
396
            exception_args = list(exception.args)
1✔
397
            invalid_definition = InvalidDefinition()
1✔
398
            invalid_definition.message = (
1✔
399
                f"Error={exception_name} Args={exception_args} in definition '{definition}'."
400
            )
401
            raise invalid_definition
1✔
402

403
    @staticmethod
1✔
404
    def _sanitise_logging_configuration(
1✔
405
        logging_configuration: LoggingConfiguration,
406
    ) -> None:
407
        level = logging_configuration.get("level")
1✔
408
        destinations = logging_configuration.get("destinations")
1✔
409

410
        if destinations is not None and len(destinations) > 1:
1✔
411
            raise InvalidLoggingConfiguration(
1✔
412
                "Invalid Logging Configuration: Must specify exactly one Log Destination."
413
            )
414

415
        # A LogLevel that is not OFF, should have a destination.
416
        if level is not None and level != LogLevel.OFF and not destinations:
1✔
417
            raise InvalidLoggingConfiguration(
1✔
418
                "Invalid Logging Configuration: Must specify exactly one Log Destination."
419
            )
420

421
        # Default for level is OFF.
422
        level = level or LogLevel.OFF
1✔
423

424
        # Default for includeExecutionData is False.
425
        include_flag = logging_configuration.get("includeExecutionData", False)
1✔
426

427
        # Update configuration object.
428
        logging_configuration["level"] = level
1✔
429
        logging_configuration["includeExecutionData"] = include_flag
1✔
430

431
    def create_state_machine(
1✔
432
        self, context: RequestContext, request: CreateStateMachineInput, **kwargs
433
    ) -> CreateStateMachineOutput:
434
        if not request.get("publish", False) and request.get("versionDescription"):
1✔
435
            raise ValidationException("Version description can only be set when publish is true")
1✔
436

437
        # Extract parameters and set defaults.
438
        state_machine_name = request["name"]
1✔
439
        state_machine_role_arn = request["roleArn"]
1✔
440
        state_machine_definition = request["definition"]
1✔
441
        state_machine_type = request.get("type") or StateMachineType.STANDARD
1✔
442
        state_machine_tracing_configuration = request.get("tracingConfiguration")
1✔
443
        state_machine_tags = request.get("tags")
1✔
444
        state_machine_logging_configuration = request.get(
1✔
445
            "loggingConfiguration", LoggingConfiguration()
446
        )
447
        self._sanitise_logging_configuration(
1✔
448
            logging_configuration=state_machine_logging_configuration
449
        )
450

451
        # CreateStateMachine is an idempotent API. Subsequent requests won't create a duplicate resource if it was
452
        # already created.
453
        idem_state_machine: StateMachineRevision | None = self._idempotent_revision(
1✔
454
            context=context,
455
            name=state_machine_name,
456
            definition=state_machine_definition,
457
            state_machine_type=state_machine_type,
458
            logging_configuration=state_machine_logging_configuration,
459
            tracing_configuration=state_machine_tracing_configuration,
460
        )
461
        if idem_state_machine is not None:
1✔
462
            return CreateStateMachineOutput(
1✔
463
                stateMachineArn=idem_state_machine.arn,
464
                creationDate=idem_state_machine.create_date,
465
            )
466

467
        # Assert this state machine name is unique.
468
        state_machine_with_name: StateMachineRevision | None = self._revision_by_name(
1✔
469
            context=context, name=state_machine_name
470
        )
471
        if state_machine_with_name is not None:
1✔
472
            raise StateMachineAlreadyExists(
1✔
473
                f"State Machine Already Exists: '{state_machine_with_name.arn}'"
474
            )
475

476
        # Compute the state machine's Arn.
477
        state_machine_arn = stepfunctions_state_machine_arn(
1✔
478
            name=state_machine_name,
479
            account_id=context.account_id,
480
            region_name=context.region,
481
        )
482
        state_machines = self.get_store(context).state_machines
1✔
483

484
        # Reduce the logging configuration to a usable cloud watch representation, and validate the destinations
485
        # if any were given.
486
        cloud_watch_logging_configuration = (
1✔
487
            CloudWatchLoggingConfiguration.from_logging_configuration(
488
                state_machine_arn=state_machine_arn,
489
                logging_configuration=state_machine_logging_configuration,
490
            )
491
        )
492
        if cloud_watch_logging_configuration is not None:
1✔
493
            cloud_watch_logging_configuration.validate()
1✔
494

495
        # Run static analysers on the definition given.
496
        if state_machine_type == StateMachineType.EXPRESS:
1✔
497
            StepFunctionsProvider._validate_definition(
1✔
498
                definition=state_machine_definition,
499
                static_analysers=[ExpressStaticAnalyser()],
500
            )
501
        else:
502
            StepFunctionsProvider._validate_definition(
1✔
503
                definition=state_machine_definition, static_analysers=[StaticAnalyser()]
504
            )
505

506
        # Create the state machine and add it to the store.
507
        state_machine = StateMachineRevision(
1✔
508
            name=state_machine_name,
509
            arn=state_machine_arn,
510
            role_arn=state_machine_role_arn,
511
            definition=state_machine_definition,
512
            sm_type=state_machine_type,
513
            logging_config=state_machine_logging_configuration,
514
            cloud_watch_logging_configuration=cloud_watch_logging_configuration,
515
            tracing_config=state_machine_tracing_configuration,
516
            tags=state_machine_tags,
517
        )
518
        state_machines[state_machine_arn] = state_machine
1✔
519

520
        create_output = CreateStateMachineOutput(
1✔
521
            stateMachineArn=state_machine.arn, creationDate=state_machine.create_date
522
        )
523

524
        # Create the first version if the 'publish' flag is used.
525
        if request.get("publish", False):
1✔
526
            version_description = request.get("versionDescription")
1✔
527
            state_machine_version = state_machine.create_version(description=version_description)
1✔
528
            if state_machine_version is not None:
1✔
529
                state_machine_version_arn = state_machine_version.arn
1✔
530
                state_machines[state_machine_version_arn] = state_machine_version
1✔
531
                create_output["stateMachineVersionArn"] = state_machine_version_arn
1✔
532

533
        # Run static analyser on definition and collect usage metrics
534
        UsageMetricsStaticAnalyser.process(state_machine_definition)
1✔
535

536
        return create_output
1✔
537

538
    def _validate_state_machine_alias_routing_configuration(
1✔
539
        self, context: RequestContext, routing_configuration_list: RoutingConfigurationList
540
    ) -> None:
541
        # TODO: to match AWS's approach best validation exceptions could be
542
        #  built in a process decoupled from the provider.
543

544
        routing_configuration_list_len = len(routing_configuration_list)
1✔
545
        if not (1 <= routing_configuration_list_len <= 2):
1✔
546
            # Replicate the object string dump format:
547
            # [RoutingConfigurationListItem(stateMachineVersionArn=arn_no_quotes, weight=int), ...]
548
            routing_configuration_serialization_parts = []
1✔
549
            for routing_configuration in routing_configuration_list:
1✔
550
                routing_configuration_serialization_parts.append(
1✔
551
                    "".join(
552
                        [
553
                            "RoutingConfigurationListItem(stateMachineVersionArn=",
554
                            routing_configuration["stateMachineVersionArn"],
555
                            ", weight=",
556
                            str(routing_configuration["weight"]),
557
                            ")",
558
                        ]
559
                    )
560
                )
561
            routing_configuration_serialization_list = (
1✔
562
                f"[{', '.join(routing_configuration_serialization_parts)}]"
563
            )
564
            raise ValidationException(
1✔
565
                f"1 validation error detected: Value '{routing_configuration_serialization_list}' "
566
                "at 'routingConfiguration' failed to "
567
                "satisfy constraint: Member must have length less than or equal to 2"
568
            )
569

570
        routing_configuration_arn_list = [
1✔
571
            routing_configuration["stateMachineVersionArn"]
572
            for routing_configuration in routing_configuration_list
573
        ]
574
        if len(set(routing_configuration_arn_list)) < routing_configuration_list_len:
1✔
575
            arn_list_string = f"[{', '.join(routing_configuration_arn_list)}]"
1✔
576
            raise ValidationException(
1✔
577
                "Routing configuration must contain distinct state machine version ARNs. "
578
                f"Received: {arn_list_string}"
579
            )
580

581
        routing_weights = [
1✔
582
            routing_configuration["weight"] for routing_configuration in routing_configuration_list
583
        ]
584
        for i, weight in enumerate(routing_weights):
1✔
585
            # TODO: check for weight type.
586
            if weight < 0:
1✔
UNCOV
587
                raise ValidationException(
×
588
                    f"Invalid value for parameter routingConfiguration[{i + 1}].weight, value: {weight}, valid min value: 0"
589
                )
590
            if weight > 100:
1✔
591
                raise ValidationException(
1✔
592
                    f"1 validation error detected: Value '{weight}' at 'routingConfiguration.{i + 1}.member.weight' "
593
                    "failed to satisfy constraint: Member must have value less than or equal to 100"
594
                )
595
        routing_weights_sum = sum(routing_weights)
1✔
596
        if not routing_weights_sum == 100:
1✔
597
            raise ValidationException(
1✔
598
                f"Sum of routing configuration weights must equal 100. Received: {json.dumps(routing_weights)}"
599
            )
600

601
        store = self.get_store(context=context)
1✔
602
        state_machines = store.state_machines
1✔
603

604
        first_routing_qualified_arn = routing_configuration_arn_list[0]
1✔
605
        shared_state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
606
            qualified_arn=first_routing_qualified_arn
607
        )
608
        for routing_configuration_arn in routing_configuration_arn_list:
1✔
609
            maybe_state_machine_version = state_machines.get(routing_configuration_arn)
1✔
610
            if not isinstance(maybe_state_machine_version, StateMachineVersion):
1✔
611
                arn_list_string = f"[{', '.join(routing_configuration_arn_list)}]"
1✔
612
                raise ValidationException(
1✔
613
                    f"Routing configuration must contain state machine version ARNs. Received: {arn_list_string}"
614
                )
615
            state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
616
                qualified_arn=routing_configuration_arn
617
            )
618
            if state_machine_revision_arn != shared_state_machine_revision_arn:
1✔
UNCOV
619
                raise ValidationException("TODO")
×
620

621
    @staticmethod
1✔
622
    def _get_state_machine_arn_from_qualified_arn(qualified_arn: Arn) -> Arn:
1✔
623
        last_colon_index = qualified_arn.rfind(":")
1✔
624
        base_arn = qualified_arn[:last_colon_index]
1✔
625
        return base_arn
1✔
626

627
    def create_state_machine_alias(
1✔
628
        self,
629
        context: RequestContext,
630
        name: CharacterRestrictedName,
631
        routing_configuration: RoutingConfigurationList,
632
        description: AliasDescription = None,
633
        **kwargs,
634
    ) -> CreateStateMachineAliasOutput:
635
        # Validate the inputs.
636
        self._validate_state_machine_alias_name(name=name)
1✔
637
        self._validate_state_machine_alias_routing_configuration(
1✔
638
            context=context, routing_configuration_list=routing_configuration
639
        )
640

641
        # Determine the state machine arn this alias maps to,
642
        # do so unsafely as validation already took place before initialisation.
643
        first_routing_qualified_arn = routing_configuration[0]["stateMachineVersionArn"]
1✔
644
        state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
645
            qualified_arn=first_routing_qualified_arn
646
        )
647
        alias = Alias(
1✔
648
            state_machine_arn=state_machine_revision_arn,
649
            name=name,
650
            description=description,
651
            routing_configuration_list=routing_configuration,
652
        )
653
        state_machine_alias_arn = alias.state_machine_alias_arn
1✔
654

655
        store = self.get_store(context=context)
1✔
656

657
        aliases = store.aliases
1✔
658
        if maybe_idempotent_alias := aliases.get(state_machine_alias_arn):
1✔
659
            if alias.is_idempotent(maybe_idempotent_alias):
1✔
660
                return CreateStateMachineAliasOutput(
1✔
661
                    stateMachineAliasArn=state_machine_alias_arn, creationDate=alias.create_date
662
                )
663
            else:
664
                # CreateStateMachineAlias is an idempotent API. Idempotent requests won't create duplicate resources.
665
                raise ConflictException(
1✔
666
                    "Failed to create alias because an alias with the same name and a "
667
                    "different routing configuration already exists."
668
                )
669
        aliases[state_machine_alias_arn] = alias
1✔
670

671
        state_machine_revision = store.state_machines.get(state_machine_revision_arn)
1✔
672
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
673
            # The state machine was deleted but not the version referenced in this context.
UNCOV
674
            raise RuntimeError(f"No state machine revision for arn '{state_machine_revision_arn}'")
×
675
        state_machine_revision.aliases.add(alias)
1✔
676

677
        return CreateStateMachineAliasOutput(
1✔
678
            stateMachineAliasArn=state_machine_alias_arn, creationDate=alias.create_date
679
        )
680

681
    def describe_state_machine(
1✔
682
        self,
683
        context: RequestContext,
684
        state_machine_arn: Arn,
685
        included_data: IncludedData = None,
686
        **kwargs,
687
    ) -> DescribeStateMachineOutput:
688
        self._validate_state_machine_arn(state_machine_arn)
1✔
689
        state_machine = self.get_store(context).state_machines.get(state_machine_arn)
1✔
690
        if state_machine is None:
1✔
691
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
692
        return state_machine.describe()
1✔
693

694
    def describe_state_machine_alias(
1✔
695
        self, context: RequestContext, state_machine_alias_arn: Arn, **kwargs
696
    ) -> DescribeStateMachineAliasOutput:
697
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
698
        alias: Alias | None = self.get_store(context=context).aliases.get(state_machine_alias_arn)
1✔
699
        if alias is None:
1✔
700
            # TODO: assemble the correct exception
UNCOV
701
            raise ValidationException()
×
702
        description = alias.to_description()
1✔
703
        return description
1✔
704

705
    def describe_state_machine_for_execution(
1✔
706
        self,
707
        context: RequestContext,
708
        execution_arn: Arn,
709
        included_data: IncludedData = None,
710
        **kwargs,
711
    ) -> DescribeStateMachineForExecutionOutput:
712
        self._validate_state_machine_execution_arn(execution_arn)
1✔
713
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
714
        return execution.to_describe_state_machine_for_execution_output()
1✔
715

716
    def send_task_heartbeat(
1✔
717
        self, context: RequestContext, task_token: TaskToken, **kwargs
718
    ) -> SendTaskHeartbeatOutput:
719
        running_executions: list[Execution] = self._get_executions(context, ExecutionStatus.RUNNING)
1✔
720
        for execution in running_executions:
1✔
721
            try:
1✔
722
                if execution.exec_worker.env.callback_pool_manager.heartbeat(
1✔
723
                    callback_id=task_token
724
                ):
725
                    return SendTaskHeartbeatOutput()
1✔
726
            except CallbackNotifyConsumerError as consumer_error:
×
727
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
UNCOV
728
                    raise TaskTimedOut()
×
729
                else:
730
                    raise TaskDoesNotExist()
×
UNCOV
731
        raise InvalidToken()
×
732

733
    def send_task_success(
1✔
734
        self,
735
        context: RequestContext,
736
        task_token: TaskToken,
737
        output: SensitiveData,
738
        **kwargs,
739
    ) -> SendTaskSuccessOutput:
740
        outcome = CallbackOutcomeSuccess(callback_id=task_token, output=output)
1✔
741
        running_executions: list[Execution] = self._get_executions(context, ExecutionStatus.RUNNING)
1✔
742
        for execution in running_executions:
1✔
743
            try:
1✔
744
                if execution.exec_worker.env.callback_pool_manager.notify(
1✔
745
                    callback_id=task_token, outcome=outcome
746
                ):
747
                    return SendTaskSuccessOutput()
1✔
748
            except CallbackNotifyConsumerError as consumer_error:
×
749
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
UNCOV
750
                    raise TaskTimedOut()
×
751
                else:
UNCOV
752
                    raise TaskDoesNotExist()
×
753
        raise InvalidToken("Invalid token")
1✔
754

755
    def send_task_failure(
1✔
756
        self,
757
        context: RequestContext,
758
        task_token: TaskToken,
759
        error: SensitiveError = None,
760
        cause: SensitiveCause = None,
761
        **kwargs,
762
    ) -> SendTaskFailureOutput:
763
        outcome = CallbackOutcomeFailure(callback_id=task_token, error=error, cause=cause)
1✔
764
        store = self.get_store(context)
1✔
765
        for execution in store.executions.values():
1✔
766
            try:
1✔
767
                if execution.exec_worker.env.callback_pool_manager.notify(
1✔
768
                    callback_id=task_token, outcome=outcome
769
                ):
770
                    return SendTaskFailureOutput()
1✔
771
            except CallbackNotifyConsumerError as consumer_error:
×
772
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
UNCOV
773
                    raise TaskTimedOut()
×
774
                else:
UNCOV
775
                    raise TaskDoesNotExist()
×
776
        raise InvalidToken("Invalid token")
1✔
777

778
    @staticmethod
1✔
779
    def _get_state_machine_arn(state_machine_arn: str) -> str:
1✔
780
        """Extract the state machine ARN by removing the test case suffix."""
781
        return state_machine_arn.split("#")[0]
1✔
782

783
    @staticmethod
1✔
784
    def _get_local_mock_test_case(
1✔
785
        state_machine_arn: str, state_machine_name: str
786
    ) -> LocalMockTestCase | None:
787
        """Extract and load a mock test case from a state machine ARN if present."""
788
        parts = state_machine_arn.split("#")
1✔
789
        if len(parts) != 2:
1✔
790
            return None
1✔
791

792
        mock_test_case_name = parts[1]
1✔
793
        mock_test_case = load_local_mock_test_case_for(
1✔
794
            state_machine_name=state_machine_name, test_case_name=mock_test_case_name
795
        )
796
        if mock_test_case is None:
1✔
UNCOV
797
            raise InvalidName(
×
798
                f"Invalid mock test case name '{mock_test_case_name}' "
799
                f"for state machine '{state_machine_name}'."
800
                "Either the test case is not defined or the mock configuration file "
801
                "could not be loaded. See logs for details."
802
            )
803
        return mock_test_case
1✔
804

805
    def start_execution(
1✔
806
        self,
807
        context: RequestContext,
808
        state_machine_arn: Arn,
809
        name: Name = None,
810
        input: SensitiveData = None,
811
        trace_header: TraceHeader = None,
812
        **kwargs,
813
    ) -> StartExecutionOutput:
814
        self._validate_state_machine_arn(state_machine_arn)
1✔
815

816
        base_arn = self._get_state_machine_arn(state_machine_arn)
1✔
817
        store = self.get_store(context=context)
1✔
818

819
        alias: Alias | None = store.aliases.get(base_arn)
1✔
820
        alias_sample_state_machine_version_arn = alias.sample() if alias is not None else None
1✔
821
        unsafe_state_machine: StateMachineInstance | None = store.state_machines.get(
1✔
822
            alias_sample_state_machine_version_arn or base_arn
823
        )
824
        if not unsafe_state_machine:
1✔
825
            self._raise_state_machine_does_not_exist(base_arn)
1✔
826

827
        # Update event change parameters about the state machine and should not affect those about this execution.
828
        state_machine_clone = copy.deepcopy(unsafe_state_machine)
1✔
829

830
        if input is None:
1✔
831
            input_data = {}
1✔
832
        else:
833
            try:
1✔
834
                input_data = json.loads(input)
1✔
835
            except Exception as ex:
1✔
836
                raise InvalidExecutionInput(str(ex))  # TODO: report parsing error like AWS.
1✔
837

838
        normalised_state_machine_arn = (
1✔
839
            state_machine_clone.source_arn
840
            if isinstance(state_machine_clone, StateMachineVersion)
841
            else state_machine_clone.arn
842
        )
843
        exec_name = name or long_uid()  # TODO: validate name format
1✔
844
        if state_machine_clone.sm_type == StateMachineType.STANDARD:
1✔
845
            exec_arn = stepfunctions_standard_execution_arn(normalised_state_machine_arn, exec_name)
1✔
846
        else:
847
            # Exhaustive check on STANDARD and EXPRESS type, validated on creation.
848
            exec_arn = stepfunctions_express_execution_arn(normalised_state_machine_arn, exec_name)
1✔
849

850
        if execution := store.executions.get(exec_arn):
1✔
851
            # Return already running execution if name and input match
852
            existing_execution = self._idempotent_start_execution(
1✔
853
                execution=execution,
854
                state_machine=state_machine_clone,
855
                name=name,
856
                input_data=input_data,
857
            )
858

859
            if existing_execution:
1✔
860
                return existing_execution.to_start_output()
1✔
861

862
        # Create the execution logging session, if logging is configured.
863
        cloud_watch_logging_session = None
1✔
864
        if state_machine_clone.cloud_watch_logging_configuration is not None:
1✔
865
            cloud_watch_logging_session = CloudWatchLoggingSession(
1✔
866
                execution_arn=exec_arn,
867
                configuration=state_machine_clone.cloud_watch_logging_configuration,
868
            )
869

870
        local_mock_test_case = self._get_local_mock_test_case(
1✔
871
            state_machine_arn, state_machine_clone.name
872
        )
873

874
        execution = Execution(
1✔
875
            name=exec_name,
876
            sm_type=state_machine_clone.sm_type,
877
            role_arn=state_machine_clone.role_arn,
878
            exec_arn=exec_arn,
879
            account_id=context.account_id,
880
            region_name=context.region,
881
            state_machine=state_machine_clone,
882
            state_machine_alias_arn=alias.state_machine_alias_arn if alias is not None else None,
883
            start_date=datetime.datetime.now(tz=datetime.UTC),
884
            cloud_watch_logging_session=cloud_watch_logging_session,
885
            input_data=input_data,
886
            trace_header=trace_header,
887
            activity_store=self.get_store(context).activities,
888
            local_mock_test_case=local_mock_test_case,
889
        )
890

891
        store.executions[exec_arn] = execution
1✔
892

893
        execution.start()
1✔
894
        return execution.to_start_output()
1✔
895

896
    def start_sync_execution(
1✔
897
        self,
898
        context: RequestContext,
899
        state_machine_arn: Arn,
900
        name: Name = None,
901
        input: SensitiveData = None,
902
        trace_header: TraceHeader = None,
903
        included_data: IncludedData = None,
904
        **kwargs,
905
    ) -> StartSyncExecutionOutput:
906
        self._validate_state_machine_arn(state_machine_arn)
1✔
907

908
        base_arn = self._get_state_machine_arn(state_machine_arn)
1✔
909
        unsafe_state_machine: StateMachineInstance | None = self.get_store(
1✔
910
            context
911
        ).state_machines.get(base_arn)
912
        if not unsafe_state_machine:
1✔
913
            self._raise_state_machine_does_not_exist(base_arn)
1✔
914

915
        if unsafe_state_machine.sm_type == StateMachineType.STANDARD:
1✔
916
            self._raise_state_machine_type_not_supported()
1✔
917

918
        # Update event change parameters about the state machine and should not affect those about this execution.
919
        state_machine_clone = copy.deepcopy(unsafe_state_machine)
1✔
920

921
        if input is None:
1✔
UNCOV
922
            input_data = {}
×
923
        else:
924
            try:
1✔
925
                input_data = json.loads(input)
1✔
926
            except Exception as ex:
×
UNCOV
927
                raise InvalidExecutionInput(str(ex))  # TODO: report parsing error like AWS.
×
928

929
        normalised_state_machine_arn = (
1✔
930
            state_machine_clone.source_arn
931
            if isinstance(state_machine_clone, StateMachineVersion)
932
            else state_machine_clone.arn
933
        )
934
        exec_name = name or long_uid()  # TODO: validate name format
1✔
935
        exec_arn = stepfunctions_express_execution_arn(normalised_state_machine_arn, exec_name)
1✔
936

937
        if exec_arn in self.get_store(context).executions:
1✔
UNCOV
938
            raise InvalidName()  # TODO
×
939

940
        # Create the execution logging session, if logging is configured.
941
        cloud_watch_logging_session = None
1✔
942
        if state_machine_clone.cloud_watch_logging_configuration is not None:
1✔
UNCOV
943
            cloud_watch_logging_session = CloudWatchLoggingSession(
×
944
                execution_arn=exec_arn,
945
                configuration=state_machine_clone.cloud_watch_logging_configuration,
946
            )
947

948
        local_mock_test_case = self._get_local_mock_test_case(
1✔
949
            state_machine_arn, state_machine_clone.name
950
        )
951

952
        execution = SyncExecution(
1✔
953
            name=exec_name,
954
            sm_type=state_machine_clone.sm_type,
955
            role_arn=state_machine_clone.role_arn,
956
            exec_arn=exec_arn,
957
            account_id=context.account_id,
958
            region_name=context.region,
959
            state_machine=state_machine_clone,
960
            start_date=datetime.datetime.now(tz=datetime.UTC),
961
            cloud_watch_logging_session=cloud_watch_logging_session,
962
            input_data=input_data,
963
            trace_header=trace_header,
964
            activity_store=self.get_store(context).activities,
965
            local_mock_test_case=local_mock_test_case,
966
        )
967
        self.get_store(context).executions[exec_arn] = execution
1✔
968

969
        execution.start()
1✔
970
        return execution.to_start_sync_execution_output()
1✔
971

972
    def describe_execution(
1✔
973
        self,
974
        context: RequestContext,
975
        execution_arn: Arn,
976
        included_data: IncludedData = None,
977
        **kwargs,
978
    ) -> DescribeExecutionOutput:
979
        self._validate_state_machine_execution_arn(execution_arn)
1✔
980
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
981

982
        # Action only compatible with STANDARD workflows.
983
        if execution.sm_type != StateMachineType.STANDARD:
1✔
984
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
985

986
        return execution.to_describe_output()
1✔
987

988
    @staticmethod
1✔
989
    def _list_execution_filter(
1✔
990
        ex: Execution, state_machine_arn: str, status_filter: str | None
991
    ) -> bool:
992
        state_machine_reference_arn_set = {ex.state_machine_arn, ex.state_machine_version_arn}
1✔
993
        if state_machine_arn not in state_machine_reference_arn_set:
1✔
994
            return False
1✔
995

996
        if not status_filter:
1✔
997
            return True
1✔
998
        return ex.exec_status == status_filter
1✔
999

1000
    def list_executions(
1✔
1001
        self,
1002
        context: RequestContext,
1003
        state_machine_arn: Arn = None,
1004
        status_filter: ExecutionStatus = None,
1005
        max_results: PageSize = None,
1006
        next_token: ListExecutionsPageToken = None,
1007
        map_run_arn: LongArn = None,
1008
        redrive_filter: ExecutionRedriveFilter = None,
1009
        **kwargs,
1010
    ) -> ListExecutionsOutput:
1011
        self._validate_state_machine_arn(state_machine_arn)
1✔
1012
        assert_pagination_parameters_valid(
1✔
1013
            max_results=max_results,
1014
            next_token=next_token,
1015
            next_token_length_limit=3096,
1016
        )
1017
        max_results = normalise_max_results(max_results)
1✔
1018

1019
        state_machine = self.get_store(context).state_machines.get(state_machine_arn)
1✔
1020
        if state_machine is None:
1✔
1021
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1022

1023
        if state_machine.sm_type != StateMachineType.STANDARD:
1✔
1024
            self._raise_state_machine_type_not_supported()
1✔
1025

1026
        # TODO: add support for paging
1027

1028
        allowed_execution_status = [
1✔
1029
            ExecutionStatus.SUCCEEDED,
1030
            ExecutionStatus.TIMED_OUT,
1031
            ExecutionStatus.PENDING_REDRIVE,
1032
            ExecutionStatus.ABORTED,
1033
            ExecutionStatus.FAILED,
1034
            ExecutionStatus.RUNNING,
1035
        ]
1036

1037
        validation_errors = []
1✔
1038

1039
        if status_filter and status_filter not in allowed_execution_status:
1✔
1040
            validation_errors.append(
1✔
1041
                f"Value '{status_filter}' at 'statusFilter' failed to satisfy constraint: Member must satisfy enum value set: [{', '.join(allowed_execution_status)}]"
1042
            )
1043

1044
        if not state_machine_arn and not map_run_arn:
1✔
UNCOV
1045
            validation_errors.append("Must provide a StateMachine ARN or MapRun ARN")
×
1046

1047
        if validation_errors:
1✔
1048
            errors_message = "; ".join(validation_errors)
1✔
1049
            message = f"{len(validation_errors)} validation {'errors' if len(validation_errors) > 1 else 'error'} detected: {errors_message}"
1✔
1050
            raise CommonServiceException(message=message, code="ValidationException")
1✔
1051

1052
        executions: ExecutionList = [
1✔
1053
            execution.to_execution_list_item()
1054
            for execution in self.get_store(context).executions.values()
1055
            if self._list_execution_filter(
1056
                execution,
1057
                state_machine_arn=state_machine_arn,
1058
                status_filter=status_filter,
1059
            )
1060
        ]
1061

1062
        executions.sort(key=lambda item: item["startDate"], reverse=True)
1✔
1063

1064
        paginated_executions = PaginatedList(executions)
1✔
1065
        page, token_for_next_page = paginated_executions.get_page(
1✔
1066
            token_generator=lambda item: get_next_page_token_from_arn(item.get("executionArn")),
1067
            page_size=max_results,
1068
            next_token=next_token,
1069
        )
1070

1071
        return ListExecutionsOutput(executions=page, nextToken=token_for_next_page)
1✔
1072

1073
    def list_state_machines(
1✔
1074
        self,
1075
        context: RequestContext,
1076
        max_results: PageSize = None,
1077
        next_token: PageToken = None,
1078
        **kwargs,
1079
    ) -> ListStateMachinesOutput:
1080
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1081
        max_results = normalise_max_results(max_results)
1✔
1082

1083
        state_machines: StateMachineList = [
1✔
1084
            sm.itemise()
1085
            for sm in self.get_store(context).state_machines.values()
1086
            if isinstance(sm, StateMachineRevision)
1087
        ]
1088
        state_machines.sort(key=lambda item: item["name"])
1✔
1089

1090
        paginated_state_machines = PaginatedList(state_machines)
1✔
1091
        page, token_for_next_page = paginated_state_machines.get_page(
1✔
1092
            token_generator=lambda item: get_next_page_token_from_arn(item.get("stateMachineArn")),
1093
            page_size=max_results,
1094
            next_token=next_token,
1095
        )
1096

1097
        return ListStateMachinesOutput(stateMachines=page, nextToken=token_for_next_page)
1✔
1098

1099
    def list_state_machine_aliases(
1✔
1100
        self,
1101
        context: RequestContext,
1102
        state_machine_arn: Arn,
1103
        next_token: PageToken = None,
1104
        max_results: PageSize = None,
1105
        **kwargs,
1106
    ) -> ListStateMachineAliasesOutput:
1107
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1108

1109
        self._validate_state_machine_arn(state_machine_arn)
1✔
1110
        state_machines = self.get_store(context).state_machines
1✔
1111
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1112
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
UNCOV
1113
            raise InvalidArn(f"Invalid arn: {state_machine_arn}")
×
1114

1115
        state_machine_aliases: StateMachineAliasList = []
1✔
1116
        valid_token_found = next_token is None
1✔
1117

1118
        for alias in state_machine_revision.aliases:
1✔
1119
            state_machine_aliases.append(alias.to_item())
1✔
1120
            if alias.tokenized_state_machine_alias_arn == next_token:
1✔
1121
                valid_token_found = True
1✔
1122

1123
        if not valid_token_found:
1✔
1124
            raise InvalidToken("Invalid Token: 'Invalid token'")
1✔
1125

1126
        state_machine_aliases.sort(key=lambda item: item["creationDate"])
1✔
1127

1128
        paginated_list = PaginatedList(state_machine_aliases)
1✔
1129

1130
        paginated_aliases, next_token = paginated_list.get_page(
1✔
1131
            token_generator=lambda item: get_next_page_token_from_arn(
1132
                item.get("stateMachineAliasArn")
1133
            ),
1134
            next_token=next_token,
1135
            page_size=100 if max_results == 0 or max_results is None else max_results,
1136
        )
1137

1138
        return ListStateMachineAliasesOutput(
1✔
1139
            stateMachineAliases=paginated_aliases, nextToken=next_token
1140
        )
1141

1142
    def list_state_machine_versions(
1✔
1143
        self,
1144
        context: RequestContext,
1145
        state_machine_arn: Arn,
1146
        next_token: PageToken = None,
1147
        max_results: PageSize = None,
1148
        **kwargs,
1149
    ) -> ListStateMachineVersionsOutput:
1150
        self._validate_state_machine_arn(state_machine_arn)
1✔
1151
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1152
        max_results = normalise_max_results(max_results)
1✔
1153

1154
        state_machines = self.get_store(context).state_machines
1✔
1155
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1156
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
UNCOV
1157
            raise InvalidArn(f"Invalid arn: {state_machine_arn}")
×
1158

1159
        state_machine_version_items = []
1✔
1160
        for version_arn in state_machine_revision.versions.values():
1✔
1161
            state_machine_version = state_machines[version_arn]
1✔
1162
            if isinstance(state_machine_version, StateMachineVersion):
1✔
1163
                state_machine_version_items.append(state_machine_version.itemise())
1✔
1164
            else:
UNCOV
1165
                raise RuntimeError(
×
1166
                    f"Expected {version_arn} to be a StateMachine Version, but got '{type(state_machine_version)}'."
1167
                )
1168

1169
        state_machine_version_items.sort(key=lambda item: item["creationDate"], reverse=True)
1✔
1170

1171
        paginated_state_machine_versions = PaginatedList(state_machine_version_items)
1✔
1172
        page, token_for_next_page = paginated_state_machine_versions.get_page(
1✔
1173
            token_generator=lambda item: get_next_page_token_from_arn(
1174
                item.get("stateMachineVersionArn")
1175
            ),
1176
            page_size=max_results,
1177
            next_token=next_token,
1178
        )
1179

1180
        return ListStateMachineVersionsOutput(
1✔
1181
            stateMachineVersions=page, nextToken=token_for_next_page
1182
        )
1183

1184
    def get_execution_history(
1✔
1185
        self,
1186
        context: RequestContext,
1187
        execution_arn: Arn,
1188
        max_results: PageSize = None,
1189
        reverse_order: ReverseOrder = None,
1190
        next_token: PageToken = None,
1191
        include_execution_data: IncludeExecutionDataGetExecutionHistory = None,
1192
        **kwargs,
1193
    ) -> GetExecutionHistoryOutput:
1194
        # TODO: add support for paging, ordering, and other manipulations.
1195
        self._validate_state_machine_execution_arn(execution_arn)
1✔
1196
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1197

1198
        # Action only compatible with STANDARD workflows.
1199
        if execution.sm_type != StateMachineType.STANDARD:
1✔
1200
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
1201

1202
        history: GetExecutionHistoryOutput = execution.to_history_output()
1✔
1203
        if reverse_order:
1✔
1204
            history["events"].reverse()
1✔
1205
        return history
1✔
1206

1207
    def delete_state_machine(
1✔
1208
        self, context: RequestContext, state_machine_arn: Arn, **kwargs
1209
    ) -> DeleteStateMachineOutput:
1210
        # TODO: halt executions?
1211
        self._validate_state_machine_arn(state_machine_arn)
1✔
1212
        state_machines = self.get_store(context).state_machines
1✔
1213
        state_machine = state_machines.get(state_machine_arn)
1✔
1214
        if isinstance(state_machine, StateMachineRevision):
1✔
1215
            state_machines.pop(state_machine_arn)
1✔
1216
            for version_arn in state_machine.versions.values():
1✔
1217
                state_machines.pop(version_arn, None)
1✔
1218
        return DeleteStateMachineOutput()
1✔
1219

1220
    def delete_state_machine_alias(
1✔
1221
        self, context: RequestContext, state_machine_alias_arn: Arn, **kwargs
1222
    ) -> DeleteStateMachineAliasOutput:
1223
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
1224
        store = self.get_store(context=context)
1✔
1225
        aliases = store.aliases
1✔
1226
        if (alias := aliases.pop(state_machine_alias_arn, None)) is not None:
1✔
1227
            state_machines = store.state_machines
1✔
1228
            for routing_configuration in alias.get_routing_configuration_list():
1✔
1229
                state_machine_version_arn = routing_configuration["stateMachineVersionArn"]
1✔
1230
                if (
1✔
1231
                    state_machine_version := state_machines.get(state_machine_version_arn)
1232
                ) is None or not isinstance(state_machine_version, StateMachineVersion):
1233
                    continue
1✔
1234
                if (
1✔
1235
                    state_machine_revision := state_machines.get(state_machine_version.source_arn)
1236
                ) is None or not isinstance(state_machine_revision, StateMachineRevision):
UNCOV
1237
                    continue
×
1238
                state_machine_revision.aliases.discard(alias)
1✔
1239
        return DeleteStateMachineOutput()
1✔
1240

1241
    def delete_state_machine_version(
1✔
1242
        self, context: RequestContext, state_machine_version_arn: LongArn, **kwargs
1243
    ) -> DeleteStateMachineVersionOutput:
1244
        self._validate_state_machine_arn(state_machine_version_arn)
1✔
1245
        state_machines = self.get_store(context).state_machines
1✔
1246

1247
        if not (
1✔
1248
            state_machine_version := state_machines.get(state_machine_version_arn)
1249
        ) or not isinstance(state_machine_version, StateMachineVersion):
1250
            return DeleteStateMachineVersionOutput()
1✔
1251

1252
        if (
1✔
1253
            state_machine_revision := state_machines.get(state_machine_version.source_arn)
1254
        ) and isinstance(state_machine_revision, StateMachineRevision):
1255
            referencing_alias_names: list[str] = []
1✔
1256
            for alias in state_machine_revision.aliases:
1✔
1257
                if alias.is_router_for(state_machine_version_arn=state_machine_version_arn):
1✔
1258
                    referencing_alias_names.append(alias.name)
1✔
1259
            if referencing_alias_names:
1✔
1260
                referencing_alias_names_list_body = ", ".join(referencing_alias_names)
1✔
1261
                raise ConflictException(
1✔
1262
                    "Version to be deleted must not be referenced by an alias. "
1263
                    f"Current list of aliases referencing this version: [{referencing_alias_names_list_body}]"
1264
                )
1265
            state_machine_revision.delete_version(state_machine_version_arn)
1✔
1266

1267
        state_machines.pop(state_machine_version.arn, None)
1✔
1268
        return DeleteStateMachineVersionOutput()
1✔
1269

1270
    def stop_execution(
1✔
1271
        self,
1272
        context: RequestContext,
1273
        execution_arn: Arn,
1274
        error: SensitiveError = None,
1275
        cause: SensitiveCause = None,
1276
        **kwargs,
1277
    ) -> StopExecutionOutput:
1278
        self._validate_state_machine_execution_arn(execution_arn)
1✔
1279
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1280

1281
        # Action only compatible with STANDARD workflows.
1282
        if execution.sm_type != StateMachineType.STANDARD:
1✔
1283
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
1284

1285
        stop_date = datetime.datetime.now(tz=datetime.UTC)
1✔
1286
        execution.stop(stop_date=stop_date, cause=cause, error=error)
1✔
1287
        return StopExecutionOutput(stopDate=stop_date)
1✔
1288

1289
    def update_state_machine(
1✔
1290
        self,
1291
        context: RequestContext,
1292
        state_machine_arn: Arn,
1293
        definition: Definition = None,
1294
        role_arn: Arn = None,
1295
        logging_configuration: LoggingConfiguration = None,
1296
        tracing_configuration: TracingConfiguration = None,
1297
        publish: Publish = None,
1298
        version_description: VersionDescription = None,
1299
        encryption_configuration: EncryptionConfiguration = None,
1300
        **kwargs,
1301
    ) -> UpdateStateMachineOutput:
1302
        self._validate_state_machine_arn(state_machine_arn)
1✔
1303
        state_machines = self.get_store(context).state_machines
1✔
1304

1305
        state_machine = state_machines.get(state_machine_arn)
1✔
1306
        if not isinstance(state_machine, StateMachineRevision):
1✔
1307
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1308

1309
        # TODO: Add logic to handle metrics for when SFN definitions update
1310
        if not any([definition, role_arn, logging_configuration]):
1✔
1311
            raise MissingRequiredParameter(
1✔
1312
                "Either the definition, the role ARN, the LoggingConfiguration, "
1313
                "or the TracingConfiguration must be specified"
1314
            )
1315

1316
        if definition is not None:
1✔
1317
            self._validate_definition(definition=definition, static_analysers=[StaticAnalyser()])
1✔
1318

1319
        if logging_configuration is not None:
1✔
1320
            self._sanitise_logging_configuration(logging_configuration=logging_configuration)
1✔
1321

1322
        revision_id = state_machine.create_revision(
1✔
1323
            definition=definition,
1324
            role_arn=role_arn,
1325
            logging_configuration=logging_configuration,
1326
        )
1327

1328
        version_arn = None
1✔
1329
        if publish:
1✔
1330
            version = state_machine.create_version(description=version_description)
1✔
1331
            if version is not None:
1✔
1332
                version_arn = version.arn
1✔
1333
                state_machines[version_arn] = version
1✔
1334
            else:
1335
                target_revision_id = revision_id or state_machine.revision_id
1✔
1336
                version_arn = state_machine.versions[target_revision_id]
1✔
1337

1338
        update_output = UpdateStateMachineOutput(updateDate=datetime.datetime.now(tz=datetime.UTC))
1✔
1339
        if revision_id is not None:
1✔
1340
            update_output["revisionId"] = revision_id
1✔
1341
        if version_arn is not None:
1✔
1342
            update_output["stateMachineVersionArn"] = version_arn
1✔
1343
        return update_output
1✔
1344

1345
    def update_state_machine_alias(
1✔
1346
        self,
1347
        context: RequestContext,
1348
        state_machine_alias_arn: Arn,
1349
        description: AliasDescription = None,
1350
        routing_configuration: RoutingConfigurationList = None,
1351
        **kwargs,
1352
    ) -> UpdateStateMachineAliasOutput:
1353
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
1354
        if not any([description, routing_configuration]):
1✔
UNCOV
1355
            raise MissingRequiredParameter(
×
1356
                "Either the description or the RoutingConfiguration must be specified"
1357
            )
1358
        if routing_configuration is not None:
1✔
1359
            self._validate_state_machine_alias_routing_configuration(
1✔
1360
                context=context, routing_configuration_list=routing_configuration
1361
            )
1362
        store = self.get_store(context=context)
1✔
1363
        alias = store.aliases.get(state_machine_alias_arn)
1✔
1364
        if alias is None:
1✔
1365
            raise ResourceNotFound("Request references a resource that does not exist.")
1✔
1366

1367
        alias.update(description=description, routing_configuration_list=routing_configuration)
1✔
1368
        return UpdateStateMachineAliasOutput(updateDate=alias.update_date)
1✔
1369

1370
    def publish_state_machine_version(
1✔
1371
        self,
1372
        context: RequestContext,
1373
        state_machine_arn: Arn,
1374
        revision_id: RevisionId = None,
1375
        description: VersionDescription = None,
1376
        **kwargs,
1377
    ) -> PublishStateMachineVersionOutput:
1378
        self._validate_state_machine_arn(state_machine_arn)
1✔
1379
        state_machines = self.get_store(context).state_machines
1✔
1380

1381
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1382
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
1383
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1384

1385
        if revision_id is not None and state_machine_revision.revision_id != revision_id:
1✔
1386
            raise ConflictException(
1✔
1387
                f"Failed to publish the State Machine version for revision {revision_id}. "
1388
                f"The current State Machine revision is {state_machine_revision.revision_id}."
1389
            )
1390

1391
        state_machine_version = state_machine_revision.create_version(description=description)
1✔
1392
        if state_machine_version is not None:
1✔
1393
            state_machines[state_machine_version.arn] = state_machine_version
1✔
1394
        else:
1395
            target_revision_id = revision_id or state_machine_revision.revision_id
1✔
1396
            state_machine_version_arn = state_machine_revision.versions.get(target_revision_id)
1✔
1397
            state_machine_version = state_machines[state_machine_version_arn]
1✔
1398

1399
        return PublishStateMachineVersionOutput(
1✔
1400
            creationDate=state_machine_version.create_date,
1401
            stateMachineVersionArn=state_machine_version.arn,
1402
        )
1403

1404
    def tag_resource(
1✔
1405
        self, context: RequestContext, resource_arn: Arn, tags: TagList, **kwargs
1406
    ) -> TagResourceOutput:
1407
        # TODO: add tagging for activities.
1408
        state_machines = self.get_store(context).state_machines
1✔
1409
        state_machine = state_machines.get(resource_arn)
1✔
1410
        if not isinstance(state_machine, StateMachineRevision):
1✔
1411
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
1✔
1412

1413
        state_machine.tag_manager.add_all(tags)
1✔
1414
        return TagResourceOutput()
1✔
1415

1416
    def untag_resource(
1✔
1417
        self, context: RequestContext, resource_arn: Arn, tag_keys: TagKeyList, **kwargs
1418
    ) -> UntagResourceOutput:
1419
        # TODO: add untagging for activities.
1420
        state_machines = self.get_store(context).state_machines
1✔
1421
        state_machine = state_machines.get(resource_arn)
1✔
1422
        if not isinstance(state_machine, StateMachineRevision):
1✔
UNCOV
1423
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
×
1424

1425
        state_machine.tag_manager.remove_all(tag_keys)
1✔
1426
        return UntagResourceOutput()
1✔
1427

1428
    def list_tags_for_resource(
1✔
1429
        self, context: RequestContext, resource_arn: Arn, **kwargs
1430
    ) -> ListTagsForResourceOutput:
1431
        # TODO: add untagging for activities.
1432
        state_machines = self.get_store(context).state_machines
1✔
1433
        state_machine = state_machines.get(resource_arn)
1✔
1434
        if not isinstance(state_machine, StateMachineRevision):
1✔
UNCOV
1435
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
×
1436

1437
        tags: TagList = state_machine.tag_manager.to_tag_list()
1✔
1438
        return ListTagsForResourceOutput(tags=tags)
1✔
1439

1440
    def describe_map_run(
1✔
1441
        self, context: RequestContext, map_run_arn: LongArn, **kwargs
1442
    ) -> DescribeMapRunOutput:
1443
        store = self.get_store(context)
1✔
1444
        for execution in store.executions.values():
1✔
1445
            map_run_record: MapRunRecord | None = (
1✔
1446
                execution.exec_worker.env.map_run_record_pool_manager.get(map_run_arn)
1447
            )
1448
            if map_run_record is not None:
1✔
1449
                return map_run_record.describe()
1✔
UNCOV
1450
        raise ResourceNotFound()
×
1451

1452
    def list_map_runs(
1✔
1453
        self,
1454
        context: RequestContext,
1455
        execution_arn: Arn,
1456
        max_results: PageSize = None,
1457
        next_token: PageToken = None,
1458
        **kwargs,
1459
    ) -> ListMapRunsOutput:
1460
        # TODO: add support for paging.
1461
        execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1462
        map_run_records: list[MapRunRecord] = (
1✔
1463
            execution.exec_worker.env.map_run_record_pool_manager.get_all()
1464
        )
1465
        return ListMapRunsOutput(
1✔
1466
            mapRuns=[map_run_record.list_item() for map_run_record in map_run_records]
1467
        )
1468

1469
    def update_map_run(
1✔
1470
        self,
1471
        context: RequestContext,
1472
        map_run_arn: LongArn,
1473
        max_concurrency: MaxConcurrency = None,
1474
        tolerated_failure_percentage: ToleratedFailurePercentage = None,
1475
        tolerated_failure_count: ToleratedFailureCount = None,
1476
        **kwargs,
1477
    ) -> UpdateMapRunOutput:
UNCOV
1478
        if tolerated_failure_percentage is not None or tolerated_failure_count is not None:
×
1479
            raise NotImplementedError(
1480
                "Updating of ToleratedFailureCount and ToleratedFailurePercentage is currently unsupported."
1481
            )
1482
        # TODO: investigate behaviour of empty requests.
1483
        store = self.get_store(context)
×
1484
        for execution in store.executions.values():
×
UNCOV
1485
            map_run_record: MapRunRecord | None = (
×
1486
                execution.exec_worker.env.map_run_record_pool_manager.get(map_run_arn)
1487
            )
1488
            if map_run_record is not None:
×
UNCOV
1489
                map_run_record.update(
×
1490
                    max_concurrency=max_concurrency,
1491
                    tolerated_failure_count=tolerated_failure_count,
1492
                    tolerated_failure_percentage=tolerated_failure_percentage,
1493
                )
UNCOV
1494
                LOG.warning(
×
1495
                    "StepFunctions UpdateMapRun changes are currently not being reflected in the MapRun instances."
1496
                )
1497
                return UpdateMapRunOutput()
×
UNCOV
1498
        raise ResourceNotFound()
×
1499

1500
    def test_state(
1✔
1501
        self, context: RequestContext, request: TestStateInput, **kwargs
1502
    ) -> TestStateOutput:
1503
        state_name = request.get("stateName")
1✔
1504
        definition = request["definition"]
1✔
1505

1506
        StepFunctionsProvider._validate_definition(
1✔
1507
            definition=definition,
1508
            static_analysers=[TestStateStaticAnalyser(state_name)],
1509
        )
1510

1511
        # if StateName is present, we need to ensure the state being referenced exists in full definition.
1512
        if state_name and not TestStateStaticAnalyser.is_state_in_definition(
1✔
1513
            definition=definition, state_name=state_name
1514
        ):
1515
            raise ValidationException("State not found in definition")
1✔
1516

1517
        mock_input = request.get("mock")
1✔
1518
        if mock_input is not None:
1✔
1519
            self._validate_test_state_mock_input(mock_input)
1✔
1520
            TestStateStaticAnalyser.validate_mock(
1✔
1521
                mock_input=mock_input, definition=definition, state_name=state_name
1522
            )
1523

1524
        if state_configuration := request.get("stateConfiguration"):
1✔
1525
            # TODO: Add validations for this i.e assert len(input) <= failureCount
1526
            pass
1✔
1527

1528
        if state_context := request.get("context"):
1✔
1529
            # TODO: Add validation ensuring only present if 'mock' is specified
1530
            # An error occurred (ValidationException) when calling the TestState operation: State type 'Pass' is not supported when a mock is specified
1531
            pass
1✔
1532

1533
        try:
1✔
1534
            state_mock = TestStateMock(
1✔
1535
                mock_input=mock_input,
1536
                state_configuration=state_configuration,
1537
                context=state_context,
1538
            )
1539
        except ValueError as e:
1✔
1540
            LOG.error(e)
1✔
1541
            raise ValidationException(f"Invalid Context object provided: {e}")
1✔
1542

1543
        name: Name | None = f"TestState-{short_uid()}"
1✔
1544
        arn = stepfunctions_state_machine_arn(
1✔
1545
            name=name, account_id=context.account_id, region_name=context.region
1546
        )
1547
        role_arn = request.get("roleArn")
1✔
1548
        if role_arn is None:
1✔
1549
            TestStateStaticAnalyser.validate_role_arn_required(
1✔
1550
                mock_input=mock_input, definition=definition, state_name=state_name
1551
            )
1552
            # HACK: Added dummy role ARN because it is a required field in Execution.
1553
            # To allow optional roleArn for the test state but preserve the mandatory one for regular executions
1554
            # we likely need to remove inheritance TestStateExecution(Execution) in favor of composition.
1555
            # TestState execution starts to have too many simplifications compared to a regular execution
1556
            # which renders the inheritance mechanism harmful.
1557
            # TODO make role_arn optional in TestStateExecution
1558
            role_arn = arns.iam_role_arn(
1✔
1559
                role_name=f"RoleFor-{name}",
1560
                account_id=context.account_id,
1561
                region_name=context.region,
1562
            )
1563

1564
        state_machine = TestStateMachine(
1✔
1565
            name=name,
1566
            arn=arn,
1567
            role_arn=role_arn,
1568
            definition=request["definition"],
1569
        )
1570

1571
        # HACK(gregfurman): The ARN that gets generated has a duplicate 'name' field in the
1572
        # resource ARN. Just replace this duplication and extract the execution ID.
1573
        exec_arn = stepfunctions_express_execution_arn(state_machine.arn, name)
1✔
1574
        exec_arn = exec_arn.replace(f":{name}:{name}:", f":{name}:", 1)
1✔
1575
        _, exec_name = exec_arn.rsplit(":", 1)
1✔
1576

1577
        if input_json := request.get("input", {}):
1✔
1578
            input_json = json.loads(input_json)
1✔
1579

1580
        execution = TestStateExecution(
1✔
1581
            name=exec_name,
1582
            role_arn=role_arn,
1583
            exec_arn=exec_arn,
1584
            account_id=context.account_id,
1585
            region_name=context.region,
1586
            state_machine=state_machine,
1587
            start_date=datetime.datetime.now(tz=datetime.UTC),
1588
            input_data=input_json,
1589
            state_name=state_name,
1590
            activity_store=self.get_store(context).activities,
1591
            mock=state_mock,
1592
        )
1593
        execution.start()
1✔
1594

1595
        test_state_output = execution.to_test_state_output(
1✔
1596
            inspection_level=request.get("inspectionLevel", InspectionLevel.INFO)
1597
        )
1598

1599
        return test_state_output
1✔
1600

1601
    def create_activity(
1✔
1602
        self,
1603
        context: RequestContext,
1604
        name: Name,
1605
        tags: TagList = None,
1606
        encryption_configuration: EncryptionConfiguration = None,
1607
        **kwargs,
1608
    ) -> CreateActivityOutput:
1609
        self._validate_activity_name(name=name)
1✔
1610

1611
        activity_arn = stepfunctions_activity_arn(
1✔
1612
            name=name, account_id=context.account_id, region_name=context.region
1613
        )
1614
        activities = self.get_store(context).activities
1✔
1615
        if activity_arn not in activities:
1✔
1616
            activity = Activity(arn=activity_arn, name=name)
1✔
1617
            activities[activity_arn] = activity
1✔
1618
        else:
1619
            activity = activities[activity_arn]
1✔
1620

1621
        return CreateActivityOutput(activityArn=activity.arn, creationDate=activity.creation_date)
1✔
1622

1623
    def delete_activity(
1✔
1624
        self, context: RequestContext, activity_arn: Arn, **kwargs
1625
    ) -> DeleteActivityOutput:
1626
        self._validate_activity_arn(activity_arn)
1✔
1627
        self.get_store(context).activities.pop(activity_arn, None)
1✔
1628
        return DeleteActivityOutput()
1✔
1629

1630
    def describe_activity(
1✔
1631
        self, context: RequestContext, activity_arn: Arn, **kwargs
1632
    ) -> DescribeActivityOutput:
1633
        self._validate_activity_arn(activity_arn)
1✔
1634
        activity = self._get_activity(context=context, activity_arn=activity_arn)
1✔
1635
        return activity.to_describe_activity_output()
1✔
1636

1637
    def list_activities(
1✔
1638
        self,
1639
        context: RequestContext,
1640
        max_results: PageSize = None,
1641
        next_token: PageToken = None,
1642
        **kwargs,
1643
    ) -> ListActivitiesOutput:
1644
        activities: list[Activity] = list(self.get_store(context).activities.values())
1✔
1645
        return ListActivitiesOutput(
1✔
1646
            activities=[activity.to_activity_list_item() for activity in activities]
1647
        )
1648

1649
    def _send_activity_task_started(
1✔
1650
        self,
1651
        context: RequestContext,
1652
        task_token: TaskToken,
1653
        worker_name: Name | None,
1654
    ) -> None:
1655
        executions: list[Execution] = self._get_executions(context)
1✔
1656
        for execution in executions:
1✔
1657
            callback_endpoint = execution.exec_worker.env.callback_pool_manager.get(
1✔
1658
                callback_id=task_token
1659
            )
1660
            if isinstance(callback_endpoint, ActivityCallbackEndpoint):
1✔
1661
                callback_endpoint.notify_activity_task_start(worker_name=worker_name)
1✔
1662
                return
1✔
UNCOV
1663
        raise InvalidToken()
×
1664

1665
    @staticmethod
1✔
1666
    def _pull_activity_task(activity: Activity) -> ActivityTask | None:
1✔
1667
        seconds_left = 60
1✔
1668
        while seconds_left > 0:
1✔
1669
            try:
1✔
1670
                return activity.get_task()
1✔
1671
            except IndexError:
1✔
1672
                time.sleep(1)
1✔
1673
                seconds_left -= 1
1✔
UNCOV
1674
        return None
×
1675

1676
    def get_activity_task(
1✔
1677
        self,
1678
        context: RequestContext,
1679
        activity_arn: Arn,
1680
        worker_name: Name = None,
1681
        **kwargs,
1682
    ) -> GetActivityTaskOutput:
1683
        self._validate_activity_arn(activity_arn)
1✔
1684

1685
        activity = self._get_activity(context=context, activity_arn=activity_arn)
1✔
1686
        maybe_task: ActivityTask | None = self._pull_activity_task(activity=activity)
1✔
1687
        if maybe_task is not None:
1✔
1688
            self._send_activity_task_started(
1✔
1689
                context, maybe_task.task_token, worker_name=worker_name
1690
            )
1691
            return GetActivityTaskOutput(
1✔
1692
                taskToken=maybe_task.task_token, input=maybe_task.task_input
1693
            )
1694

UNCOV
1695
        return GetActivityTaskOutput(taskToken=None, input=None)
×
1696

1697
    def validate_state_machine_definition(
1✔
1698
        self, context: RequestContext, request: ValidateStateMachineDefinitionInput, **kwargs
1699
    ) -> ValidateStateMachineDefinitionOutput:
1700
        # TODO: increase parity of static analysers, current implementation is an unblocker for this API action.
1701
        # TODO: add support for ValidateStateMachineDefinitionSeverity
1702
        # TODO: add support for ValidateStateMachineDefinitionMaxResult
1703

1704
        state_machine_type: StateMachineType = request.get("type", StateMachineType.STANDARD)
1✔
1705
        definition: str = request["definition"]
1✔
1706

1707
        static_analysers = []
1✔
1708
        if state_machine_type == StateMachineType.STANDARD:
1✔
1709
            static_analysers.append(StaticAnalyser())
1✔
1710
        else:
1711
            static_analysers.append(ExpressStaticAnalyser())
1✔
1712

1713
        diagnostics: ValidateStateMachineDefinitionDiagnosticList = []
1✔
1714
        try:
1✔
1715
            StepFunctionsProvider._validate_definition(
1✔
1716
                definition=definition, static_analysers=static_analysers
1717
            )
1718
            validation_result = ValidateStateMachineDefinitionResultCode.OK
1✔
1719
        except InvalidDefinition as invalid_definition:
1✔
1720
            validation_result = ValidateStateMachineDefinitionResultCode.FAIL
1✔
1721
            diagnostics.append(
1✔
1722
                ValidateStateMachineDefinitionDiagnostic(
1723
                    severity=ValidateStateMachineDefinitionSeverity.ERROR,
1724
                    code="SCHEMA_VALIDATION_FAILED",
1725
                    message=invalid_definition.message,
1726
                )
1727
            )
UNCOV
1728
        except Exception as ex:
×
UNCOV
1729
            validation_result = ValidateStateMachineDefinitionResultCode.FAIL
×
UNCOV
1730
            LOG.error("Unknown error during validation %s", ex)
×
1731

1732
        return ValidateStateMachineDefinitionOutput(
1✔
1733
            result=validation_result, diagnostics=diagnostics, truncated=False
1734
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc