• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19656300538

24 Nov 2025 06:52PM UTC coverage: 86.867% (-0.01%) from 86.879%
19656300538

push

github

web-flow
CFn: validate conditions exist in Fn::If (#13243)

3 of 4 new or added lines in 1 file covered. (75.0%)

236 existing lines in 7 files now uncovered.

68861 of 79272 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.49
/localstack-core/localstack/services/stepfunctions/provider.py
1
import copy
1✔
2
import datetime
1✔
3
import json
1✔
4
import logging
1✔
5
import re
1✔
6
import time
1✔
7
from typing import Final
1✔
8

9
from localstack.aws.api import CommonServiceException, RequestContext
1✔
10
from localstack.aws.api.stepfunctions import (
1✔
11
    ActivityDoesNotExist,
12
    AliasDescription,
13
    Arn,
14
    CharacterRestrictedName,
15
    ConflictException,
16
    CreateActivityOutput,
17
    CreateStateMachineAliasOutput,
18
    CreateStateMachineInput,
19
    CreateStateMachineOutput,
20
    Definition,
21
    DeleteActivityOutput,
22
    DeleteStateMachineAliasOutput,
23
    DeleteStateMachineOutput,
24
    DeleteStateMachineVersionOutput,
25
    DescribeActivityOutput,
26
    DescribeExecutionOutput,
27
    DescribeMapRunOutput,
28
    DescribeStateMachineAliasOutput,
29
    DescribeStateMachineForExecutionOutput,
30
    DescribeStateMachineOutput,
31
    EncryptionConfiguration,
32
    ExecutionDoesNotExist,
33
    ExecutionList,
34
    ExecutionRedriveFilter,
35
    ExecutionStatus,
36
    GetActivityTaskOutput,
37
    GetExecutionHistoryOutput,
38
    IncludedData,
39
    IncludeExecutionDataGetExecutionHistory,
40
    InspectionLevel,
41
    InvalidArn,
42
    InvalidDefinition,
43
    InvalidExecutionInput,
44
    InvalidLoggingConfiguration,
45
    InvalidName,
46
    InvalidToken,
47
    ListActivitiesOutput,
48
    ListExecutionsOutput,
49
    ListExecutionsPageToken,
50
    ListMapRunsOutput,
51
    ListStateMachineAliasesOutput,
52
    ListStateMachinesOutput,
53
    ListStateMachineVersionsOutput,
54
    ListTagsForResourceOutput,
55
    LoggingConfiguration,
56
    LogLevel,
57
    LongArn,
58
    MaxConcurrency,
59
    MissingRequiredParameter,
60
    Name,
61
    PageSize,
62
    PageToken,
63
    Publish,
64
    PublishStateMachineVersionOutput,
65
    ResourceNotFound,
66
    ReverseOrder,
67
    RevisionId,
68
    RoutingConfigurationList,
69
    SendTaskFailureOutput,
70
    SendTaskHeartbeatOutput,
71
    SendTaskSuccessOutput,
72
    SensitiveCause,
73
    SensitiveData,
74
    SensitiveError,
75
    StartExecutionOutput,
76
    StartSyncExecutionOutput,
77
    StateMachineAliasList,
78
    StateMachineAlreadyExists,
79
    StateMachineDoesNotExist,
80
    StateMachineList,
81
    StateMachineType,
82
    StateMachineTypeNotSupported,
83
    StepfunctionsApi,
84
    StopExecutionOutput,
85
    TagKeyList,
86
    TagList,
87
    TagResourceOutput,
88
    TaskDoesNotExist,
89
    TaskTimedOut,
90
    TaskToken,
91
    TestStateInput,
92
    TestStateOutput,
93
    ToleratedFailureCount,
94
    ToleratedFailurePercentage,
95
    TraceHeader,
96
    TracingConfiguration,
97
    UntagResourceOutput,
98
    UpdateMapRunOutput,
99
    UpdateStateMachineAliasOutput,
100
    UpdateStateMachineOutput,
101
    ValidateStateMachineDefinitionDiagnostic,
102
    ValidateStateMachineDefinitionDiagnosticList,
103
    ValidateStateMachineDefinitionInput,
104
    ValidateStateMachineDefinitionOutput,
105
    ValidateStateMachineDefinitionResultCode,
106
    ValidateStateMachineDefinitionSeverity,
107
    ValidationException,
108
    VersionDescription,
109
)
110
from localstack.services.plugins import ServiceLifecycleHook
1✔
111
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.iteration.itemprocessor.map_run_record import (
1✔
112
    MapRunRecord,
113
)
114
from localstack.services.stepfunctions.asl.eval.callback.callback import (
1✔
115
    ActivityCallbackEndpoint,
116
    CallbackConsumerTimeout,
117
    CallbackNotifyConsumerError,
118
    CallbackOutcomeFailure,
119
    CallbackOutcomeSuccess,
120
)
121
from localstack.services.stepfunctions.asl.eval.event.logging import (
1✔
122
    CloudWatchLoggingConfiguration,
123
    CloudWatchLoggingSession,
124
)
125
from localstack.services.stepfunctions.asl.parse.asl_parser import (
1✔
126
    ASLParserException,
127
)
128
from localstack.services.stepfunctions.asl.static_analyser.express_static_analyser import (
1✔
129
    ExpressStaticAnalyser,
130
)
131
from localstack.services.stepfunctions.asl.static_analyser.static_analyser import (
1✔
132
    StaticAnalyser,
133
)
134
from localstack.services.stepfunctions.asl.static_analyser.test_state.test_state_analyser import (
1✔
135
    TestStateStaticAnalyser,
136
)
137
from localstack.services.stepfunctions.asl.static_analyser.usage_metrics_static_analyser import (
1✔
138
    UsageMetricsStaticAnalyser,
139
)
140
from localstack.services.stepfunctions.backend.activity import Activity, ActivityTask
1✔
141
from localstack.services.stepfunctions.backend.alias import Alias
1✔
142
from localstack.services.stepfunctions.backend.execution import Execution, SyncExecution
1✔
143
from localstack.services.stepfunctions.backend.state_machine import (
1✔
144
    StateMachineInstance,
145
    StateMachineRevision,
146
    StateMachineVersion,
147
    TestStateMachine,
148
)
149
from localstack.services.stepfunctions.backend.store import SFNStore, sfn_stores
1✔
150
from localstack.services.stepfunctions.backend.test_state.execution import (
1✔
151
    TestStateExecution,
152
)
153
from localstack.services.stepfunctions.mocking.mock_config import (
1✔
154
    MockTestCase,
155
    load_mock_test_case_for,
156
)
157
from localstack.services.stepfunctions.stepfunctions_utils import (
1✔
158
    assert_pagination_parameters_valid,
159
    get_next_page_token_from_arn,
160
    normalise_max_results,
161
)
162
from localstack.state import StateVisitor
1✔
163
from localstack.utils.aws.arns import (
1✔
164
    ARN_PARTITION_REGEX,
165
    stepfunctions_activity_arn,
166
    stepfunctions_express_execution_arn,
167
    stepfunctions_standard_execution_arn,
168
    stepfunctions_state_machine_arn,
169
)
170
from localstack.utils.collections import PaginatedList
1✔
171
from localstack.utils.strings import long_uid, short_uid
1✔
172

173
LOG = logging.getLogger(__name__)
1✔
174

175

176
class StepFunctionsProvider(StepfunctionsApi, ServiceLifecycleHook):
1✔
177
    _TEST_STATE_MAX_TIMEOUT_SECONDS: Final[int] = 300  # 5 minutes.
1✔
178

179
    @staticmethod
1✔
180
    def get_store(context: RequestContext) -> SFNStore:
1✔
181
        return sfn_stores[context.account_id][context.region]
1✔
182

183
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
184
        visitor.visit(sfn_stores)
×
185

186
    _STATE_MACHINE_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
187
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:stateMachine:[a-zA-Z0-9-_.]+(:\d+)?(:[a-zA-Z0-9-_.]+)*(?:#[a-zA-Z0-9-_]+)?$"
188
    )
189

190
    _STATE_MACHINE_EXECUTION_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
191
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:(stateMachine|execution|express):[a-zA-Z0-9-_.]+(:\d+)?(:[a-zA-Z0-9-_.]+)*$"
192
    )
193

194
    _ACTIVITY_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
195
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:activity:[a-zA-Z0-9-_\.]{{1,80}}$"
196
    )
197

198
    _ALIAS_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
199
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:stateMachine:[A-Za-z0-9_.-]+:[A-Za-z_.-]+[A-Za-z0-9_.-]{{0,80}}$"
200
    )
201

202
    _ALIAS_NAME_REGEX: Final[re.Pattern] = re.compile(r"^(?=.*[a-zA-Z_\-\.])[a-zA-Z0-9_\-\.]+$")
1✔
203

204
    @staticmethod
1✔
205
    def _validate_state_machine_arn(state_machine_arn: str) -> None:
1✔
206
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
207
        if not StepFunctionsProvider._STATE_MACHINE_ARN_REGEX.match(state_machine_arn):
1✔
208
            raise InvalidArn(f"Invalid arn: '{state_machine_arn}'")
1✔
209

210
    @staticmethod
1✔
211
    def _raise_state_machine_does_not_exist(state_machine_arn: str) -> None:
1✔
212
        raise StateMachineDoesNotExist(f"State Machine Does Not Exist: '{state_machine_arn}'")
1✔
213

214
    @staticmethod
1✔
215
    def _validate_state_machine_execution_arn(execution_arn: str) -> None:
1✔
216
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
217
        if not StepFunctionsProvider._STATE_MACHINE_EXECUTION_ARN_REGEX.match(execution_arn):
1✔
218
            raise InvalidArn(f"Invalid arn: '{execution_arn}'")
1✔
219

220
    @staticmethod
1✔
221
    def _validate_activity_arn(activity_arn: str) -> None:
1✔
222
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
223
        if not StepFunctionsProvider._ACTIVITY_ARN_REGEX.match(activity_arn):
1✔
224
            raise InvalidArn(f"Invalid arn: '{activity_arn}'")
1✔
225

226
    @staticmethod
1✔
227
    def _validate_state_machine_alias_arn(state_machine_alias_arn: Arn) -> None:
1✔
228
        if not StepFunctionsProvider._ALIAS_ARN_REGEX.match(state_machine_alias_arn):
1✔
229
            raise InvalidArn(f"Invalid arn: '{state_machine_alias_arn}'")
×
230

231
    def _raise_state_machine_type_not_supported(self):
1✔
232
        raise StateMachineTypeNotSupported(
1✔
233
            "This operation is not supported by this type of state machine"
234
        )
235

236
    @staticmethod
1✔
237
    def _raise_resource_type_not_in_context(resource_type: str) -> None:
1✔
238
        lower_resource_type = resource_type.lower()
1✔
239
        raise InvalidArn(
1✔
240
            f"Invalid Arn: 'Resource type not valid in this context: {lower_resource_type}'"
241
        )
242

243
    @staticmethod
1✔
244
    def _validate_activity_name(name: str) -> None:
1✔
245
        # The activity name is validated according to the AWS StepFunctions documentation, the name should not contain:
246
        # - white space
247
        # - brackets < > { } [ ]
248
        # - wildcard characters ? *
249
        # - special characters " # % \ ^ | ~ ` $ & , ; : /
250
        # - control characters (U+0000-001F, U+007F-009F)
251
        # https://docs.aws.amazon.com/step-functions/latest/apireference/API_CreateActivity.html#API_CreateActivity_RequestSyntax
252
        if not (1 <= len(name) <= 80):
1✔
253
            raise InvalidName(f"Invalid Name: '{name}'")
×
254
        invalid_chars = set(' <>{}[]?*"#%\\^|~`$&,;:/')
1✔
255
        control_chars = {chr(i) for i in range(32)} | {chr(i) for i in range(127, 160)}
1✔
256
        invalid_chars |= control_chars
1✔
257
        for char in name:
1✔
258
            if char in invalid_chars:
1✔
259
                raise InvalidName(f"Invalid Name: '{name}'")
1✔
260

261
    @staticmethod
1✔
262
    def _validate_state_machine_alias_name(name: CharacterRestrictedName) -> None:
1✔
263
        len_name = len(name)
1✔
264
        if len_name > 80:
1✔
265
            raise ValidationException(
1✔
266
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: "
267
                f"Member must have length less than or equal to 80"
268
            )
269
        if not StepFunctionsProvider._ALIAS_NAME_REGEX.match(name):
1✔
270
            raise ValidationException(
1✔
271
                # TODO: explore more error cases in which more than one validation error may occur which results
272
                #  in the counter below being greater than 1.
273
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: "
274
                f"Member must satisfy regular expression pattern: ^(?=.*[a-zA-Z_\\-\\.])[a-zA-Z0-9_\\-\\.]+$"
275
            )
276

277
    def _get_execution(self, context: RequestContext, execution_arn: Arn) -> Execution:
1✔
278
        execution: Execution | None = self.get_store(context).executions.get(execution_arn)
1✔
279
        if not execution:
1✔
280
            raise ExecutionDoesNotExist(f"Execution Does Not Exist: '{execution_arn}'")
1✔
281
        return execution
1✔
282

283
    def _get_executions(
1✔
284
        self,
285
        context: RequestContext,
286
        execution_status: ExecutionStatus | None = None,
287
    ):
288
        store = self.get_store(context)
1✔
289
        execution: list[Execution] = list(store.executions.values())
1✔
290
        if execution_status:
1✔
291
            execution = list(
1✔
292
                filter(
293
                    lambda e: e.exec_status == execution_status,
294
                    store.executions.values(),
295
                )
296
            )
297
        return execution
1✔
298

299
    def _get_activity(self, context: RequestContext, activity_arn: Arn) -> Activity:
1✔
300
        maybe_activity: Activity | None = self.get_store(context).activities.get(activity_arn, None)
1✔
301
        if maybe_activity is None:
1✔
302
            raise ActivityDoesNotExist(f"Activity Does Not Exist: '{activity_arn}'")
1✔
303
        return maybe_activity
1✔
304

305
    def _idempotent_revision(
1✔
306
        self,
307
        context: RequestContext,
308
        name: str,
309
        definition: Definition,
310
        state_machine_type: StateMachineType,
311
        logging_configuration: LoggingConfiguration,
312
        tracing_configuration: TracingConfiguration,
313
    ) -> StateMachineRevision | None:
314
        # CreateStateMachine's idempotency check is based on the state machine name, definition, type,
315
        # LoggingConfiguration and TracingConfiguration.
316
        # If a following request has a different roleArn or tags, Step Functions will ignore these differences and
317
        # treat it as an idempotent request of the previous. In this case, roleArn and tags will not be updated, even
318
        # if they are different.
319
        state_machines: list[StateMachineInstance] = list(
1✔
320
            self.get_store(context).state_machines.values()
321
        )
322
        revisions = filter(lambda sm: isinstance(sm, StateMachineRevision), state_machines)
1✔
323
        for state_machine in revisions:
1✔
324
            check = all(
1✔
325
                [
326
                    state_machine.name == name,
327
                    state_machine.definition == definition,
328
                    state_machine.sm_type == state_machine_type,
329
                    state_machine.logging_config == logging_configuration,
330
                    state_machine.tracing_config == tracing_configuration,
331
                ]
332
            )
333
            if check:
1✔
334
                return state_machine
1✔
335
        return None
1✔
336

337
    def _idempotent_start_execution(
1✔
338
        self,
339
        execution: Execution | None,
340
        state_machine: StateMachineInstance,
341
        name: Name,
342
        input_data: SensitiveData,
343
    ) -> Execution | None:
344
        # StartExecution is idempotent for STANDARD workflows. For a STANDARD workflow,
345
        # if you call StartExecution with the same name and input as a running execution,
346
        # the call succeeds and return the same response as the original request.
347
        # If the execution is closed or if the input is different,
348
        # it returns a 400 ExecutionAlreadyExists error. You can reuse names after 90 days.
349

350
        if not execution:
1✔
351
            return None
×
352

353
        match (name, input_data, execution.exec_status, state_machine.sm_type):
1✔
354
            case (
1✔
355
                execution.name,
356
                execution.input_data,
357
                ExecutionStatus.RUNNING,
358
                StateMachineType.STANDARD,
359
            ):
360
                return execution
1✔
361

362
        raise CommonServiceException(
1✔
363
            code="ExecutionAlreadyExists",
364
            message=f"Execution Already Exists: '{execution.exec_arn}'",
365
        )
366

367
    def _revision_by_name(self, context: RequestContext, name: str) -> StateMachineInstance | None:
1✔
368
        state_machines: list[StateMachineInstance] = list(
1✔
369
            self.get_store(context).state_machines.values()
370
        )
371
        for state_machine in state_machines:
1✔
372
            if isinstance(state_machine, StateMachineRevision) and state_machine.name == name:
1✔
373
                return state_machine
1✔
374
        return None
1✔
375

376
    @staticmethod
1✔
377
    def _validate_definition(definition: str, static_analysers: list[StaticAnalyser]) -> None:
1✔
378
        try:
1✔
379
            for static_analyser in static_analysers:
1✔
380
                static_analyser.analyse(definition)
1✔
381
        except ASLParserException as asl_parser_exception:
1✔
382
            invalid_definition = InvalidDefinition()
1✔
383
            invalid_definition.message = repr(asl_parser_exception)
1✔
384
            raise invalid_definition
1✔
385
        except Exception as exception:
1✔
386
            exception_name = exception.__class__.__name__
1✔
387
            exception_args = list(exception.args)
1✔
388
            invalid_definition = InvalidDefinition()
1✔
389
            invalid_definition.message = (
1✔
390
                f"Error={exception_name} Args={exception_args} in definition '{definition}'."
391
            )
392
            raise invalid_definition
1✔
393

394
    @staticmethod
1✔
395
    def _sanitise_logging_configuration(
1✔
396
        logging_configuration: LoggingConfiguration,
397
    ) -> None:
398
        level = logging_configuration.get("level")
1✔
399
        destinations = logging_configuration.get("destinations")
1✔
400

401
        if destinations is not None and len(destinations) > 1:
1✔
402
            raise InvalidLoggingConfiguration(
1✔
403
                "Invalid Logging Configuration: Must specify exactly one Log Destination."
404
            )
405

406
        # A LogLevel that is not OFF, should have a destination.
407
        if level is not None and level != LogLevel.OFF and not destinations:
1✔
408
            raise InvalidLoggingConfiguration(
1✔
409
                "Invalid Logging Configuration: Must specify exactly one Log Destination."
410
            )
411

412
        # Default for level is OFF.
413
        level = level or LogLevel.OFF
1✔
414

415
        # Default for includeExecutionData is False.
416
        include_flag = logging_configuration.get("includeExecutionData", False)
1✔
417

418
        # Update configuration object.
419
        logging_configuration["level"] = level
1✔
420
        logging_configuration["includeExecutionData"] = include_flag
1✔
421

422
    def create_state_machine(
1✔
423
        self, context: RequestContext, request: CreateStateMachineInput, **kwargs
424
    ) -> CreateStateMachineOutput:
425
        if not request.get("publish", False) and request.get("versionDescription"):
1✔
426
            raise ValidationException("Version description can only be set when publish is true")
1✔
427

428
        # Extract parameters and set defaults.
429
        state_machine_name = request["name"]
1✔
430
        state_machine_role_arn = request["roleArn"]
1✔
431
        state_machine_definition = request["definition"]
1✔
432
        state_machine_type = request.get("type") or StateMachineType.STANDARD
1✔
433
        state_machine_tracing_configuration = request.get("tracingConfiguration")
1✔
434
        state_machine_tags = request.get("tags")
1✔
435
        state_machine_logging_configuration = request.get(
1✔
436
            "loggingConfiguration", LoggingConfiguration()
437
        )
438
        self._sanitise_logging_configuration(
1✔
439
            logging_configuration=state_machine_logging_configuration
440
        )
441

442
        # CreateStateMachine is an idempotent API. Subsequent requests won't create a duplicate resource if it was
443
        # already created.
444
        idem_state_machine: StateMachineRevision | None = self._idempotent_revision(
1✔
445
            context=context,
446
            name=state_machine_name,
447
            definition=state_machine_definition,
448
            state_machine_type=state_machine_type,
449
            logging_configuration=state_machine_logging_configuration,
450
            tracing_configuration=state_machine_tracing_configuration,
451
        )
452
        if idem_state_machine is not None:
1✔
453
            return CreateStateMachineOutput(
1✔
454
                stateMachineArn=idem_state_machine.arn,
455
                creationDate=idem_state_machine.create_date,
456
            )
457

458
        # Assert this state machine name is unique.
459
        state_machine_with_name: StateMachineRevision | None = self._revision_by_name(
1✔
460
            context=context, name=state_machine_name
461
        )
462
        if state_machine_with_name is not None:
1✔
463
            raise StateMachineAlreadyExists(
1✔
464
                f"State Machine Already Exists: '{state_machine_with_name.arn}'"
465
            )
466

467
        # Compute the state machine's Arn.
468
        state_machine_arn = stepfunctions_state_machine_arn(
1✔
469
            name=state_machine_name,
470
            account_id=context.account_id,
471
            region_name=context.region,
472
        )
473
        state_machines = self.get_store(context).state_machines
1✔
474

475
        # Reduce the logging configuration to a usable cloud watch representation, and validate the destinations
476
        # if any were given.
477
        cloud_watch_logging_configuration = (
1✔
478
            CloudWatchLoggingConfiguration.from_logging_configuration(
479
                state_machine_arn=state_machine_arn,
480
                logging_configuration=state_machine_logging_configuration,
481
            )
482
        )
483
        if cloud_watch_logging_configuration is not None:
1✔
484
            cloud_watch_logging_configuration.validate()
1✔
485

486
        # Run static analysers on the definition given.
487
        if state_machine_type == StateMachineType.EXPRESS:
1✔
488
            StepFunctionsProvider._validate_definition(
1✔
489
                definition=state_machine_definition,
490
                static_analysers=[ExpressStaticAnalyser()],
491
            )
492
        else:
493
            StepFunctionsProvider._validate_definition(
1✔
494
                definition=state_machine_definition, static_analysers=[StaticAnalyser()]
495
            )
496

497
        # Create the state machine and add it to the store.
498
        state_machine = StateMachineRevision(
1✔
499
            name=state_machine_name,
500
            arn=state_machine_arn,
501
            role_arn=state_machine_role_arn,
502
            definition=state_machine_definition,
503
            sm_type=state_machine_type,
504
            logging_config=state_machine_logging_configuration,
505
            cloud_watch_logging_configuration=cloud_watch_logging_configuration,
506
            tracing_config=state_machine_tracing_configuration,
507
            tags=state_machine_tags,
508
        )
509
        state_machines[state_machine_arn] = state_machine
1✔
510

511
        create_output = CreateStateMachineOutput(
1✔
512
            stateMachineArn=state_machine.arn, creationDate=state_machine.create_date
513
        )
514

515
        # Create the first version if the 'publish' flag is used.
516
        if request.get("publish", False):
1✔
517
            version_description = request.get("versionDescription")
1✔
518
            state_machine_version = state_machine.create_version(description=version_description)
1✔
519
            if state_machine_version is not None:
1✔
520
                state_machine_version_arn = state_machine_version.arn
1✔
521
                state_machines[state_machine_version_arn] = state_machine_version
1✔
522
                create_output["stateMachineVersionArn"] = state_machine_version_arn
1✔
523

524
        # Run static analyser on definition and collect usage metrics
525
        UsageMetricsStaticAnalyser.process(state_machine_definition)
1✔
526

527
        return create_output
1✔
528

529
    def _validate_state_machine_alias_routing_configuration(
1✔
530
        self, context: RequestContext, routing_configuration_list: RoutingConfigurationList
531
    ) -> None:
532
        # TODO: to match AWS's approach best validation exceptions could be
533
        #  built in a process decoupled from the provider.
534

535
        routing_configuration_list_len = len(routing_configuration_list)
1✔
536
        if not (1 <= routing_configuration_list_len <= 2):
1✔
537
            # Replicate the object string dump format:
538
            # [RoutingConfigurationListItem(stateMachineVersionArn=arn_no_quotes, weight=int), ...]
539
            routing_configuration_serialization_parts = []
1✔
540
            for routing_configuration in routing_configuration_list:
1✔
541
                routing_configuration_serialization_parts.append(
1✔
542
                    "".join(
543
                        [
544
                            "RoutingConfigurationListItem(stateMachineVersionArn=",
545
                            routing_configuration["stateMachineVersionArn"],
546
                            ", weight=",
547
                            str(routing_configuration["weight"]),
548
                            ")",
549
                        ]
550
                    )
551
                )
552
            routing_configuration_serialization_list = (
1✔
553
                f"[{', '.join(routing_configuration_serialization_parts)}]"
554
            )
555
            raise ValidationException(
1✔
556
                f"1 validation error detected: Value '{routing_configuration_serialization_list}' "
557
                "at 'routingConfiguration' failed to "
558
                "satisfy constraint: Member must have length less than or equal to 2"
559
            )
560

561
        routing_configuration_arn_list = [
1✔
562
            routing_configuration["stateMachineVersionArn"]
563
            for routing_configuration in routing_configuration_list
564
        ]
565
        if len(set(routing_configuration_arn_list)) < routing_configuration_list_len:
1✔
566
            arn_list_string = f"[{', '.join(routing_configuration_arn_list)}]"
1✔
567
            raise ValidationException(
1✔
568
                "Routing configuration must contain distinct state machine version ARNs. "
569
                f"Received: {arn_list_string}"
570
            )
571

572
        routing_weights = [
1✔
573
            routing_configuration["weight"] for routing_configuration in routing_configuration_list
574
        ]
575
        for i, weight in enumerate(routing_weights):
1✔
576
            # TODO: check for weight type.
577
            if weight < 0:
1✔
578
                raise ValidationException(
×
579
                    f"Invalid value for parameter routingConfiguration[{i + 1}].weight, value: {weight}, valid min value: 0"
580
                )
581
            if weight > 100:
1✔
582
                raise ValidationException(
1✔
583
                    f"1 validation error detected: Value '{weight}' at 'routingConfiguration.{i + 1}.member.weight' "
584
                    "failed to satisfy constraint: Member must have value less than or equal to 100"
585
                )
586
        routing_weights_sum = sum(routing_weights)
1✔
587
        if not routing_weights_sum == 100:
1✔
588
            raise ValidationException(
1✔
589
                f"Sum of routing configuration weights must equal 100. Received: {json.dumps(routing_weights)}"
590
            )
591

592
        store = self.get_store(context=context)
1✔
593
        state_machines = store.state_machines
1✔
594

595
        first_routing_qualified_arn = routing_configuration_arn_list[0]
1✔
596
        shared_state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
597
            qualified_arn=first_routing_qualified_arn
598
        )
599
        for routing_configuration_arn in routing_configuration_arn_list:
1✔
600
            maybe_state_machine_version = state_machines.get(routing_configuration_arn)
1✔
601
            if not isinstance(maybe_state_machine_version, StateMachineVersion):
1✔
602
                arn_list_string = f"[{', '.join(routing_configuration_arn_list)}]"
1✔
603
                raise ValidationException(
1✔
604
                    f"Routing configuration must contain state machine version ARNs. Received: {arn_list_string}"
605
                )
606
            state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
607
                qualified_arn=routing_configuration_arn
608
            )
609
            if state_machine_revision_arn != shared_state_machine_revision_arn:
1✔
610
                raise ValidationException("TODO")
×
611

612
    @staticmethod
1✔
613
    def _get_state_machine_arn_from_qualified_arn(qualified_arn: Arn) -> Arn:
1✔
614
        last_colon_index = qualified_arn.rfind(":")
1✔
615
        base_arn = qualified_arn[:last_colon_index]
1✔
616
        return base_arn
1✔
617

618
    def create_state_machine_alias(
1✔
619
        self,
620
        context: RequestContext,
621
        name: CharacterRestrictedName,
622
        routing_configuration: RoutingConfigurationList,
623
        description: AliasDescription = None,
624
        **kwargs,
625
    ) -> CreateStateMachineAliasOutput:
626
        # Validate the inputs.
627
        self._validate_state_machine_alias_name(name=name)
1✔
628
        self._validate_state_machine_alias_routing_configuration(
1✔
629
            context=context, routing_configuration_list=routing_configuration
630
        )
631

632
        # Determine the state machine arn this alias maps to,
633
        # do so unsafely as validation already took place before initialisation.
634
        first_routing_qualified_arn = routing_configuration[0]["stateMachineVersionArn"]
1✔
635
        state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
636
            qualified_arn=first_routing_qualified_arn
637
        )
638
        alias = Alias(
1✔
639
            state_machine_arn=state_machine_revision_arn,
640
            name=name,
641
            description=description,
642
            routing_configuration_list=routing_configuration,
643
        )
644
        state_machine_alias_arn = alias.state_machine_alias_arn
1✔
645

646
        store = self.get_store(context=context)
1✔
647

648
        aliases = store.aliases
1✔
649
        if maybe_idempotent_alias := aliases.get(state_machine_alias_arn):
1✔
650
            if alias.is_idempotent(maybe_idempotent_alias):
1✔
651
                return CreateStateMachineAliasOutput(
1✔
652
                    stateMachineAliasArn=state_machine_alias_arn, creationDate=alias.create_date
653
                )
654
            else:
655
                # CreateStateMachineAlias is an idempotent API. Idempotent requests won't create duplicate resources.
656
                raise ConflictException(
1✔
657
                    "Failed to create alias because an alias with the same name and a "
658
                    "different routing configuration already exists."
659
                )
660
        aliases[state_machine_alias_arn] = alias
1✔
661

662
        state_machine_revision = store.state_machines.get(state_machine_revision_arn)
1✔
663
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
664
            # The state machine was deleted but not the version referenced in this context.
665
            raise RuntimeError(f"No state machine revision for arn '{state_machine_revision_arn}'")
×
666
        state_machine_revision.aliases.add(alias)
1✔
667

668
        return CreateStateMachineAliasOutput(
1✔
669
            stateMachineAliasArn=state_machine_alias_arn, creationDate=alias.create_date
670
        )
671

672
    def describe_state_machine(
1✔
673
        self,
674
        context: RequestContext,
675
        state_machine_arn: Arn,
676
        included_data: IncludedData = None,
677
        **kwargs,
678
    ) -> DescribeStateMachineOutput:
679
        self._validate_state_machine_arn(state_machine_arn)
1✔
680
        state_machine = self.get_store(context).state_machines.get(state_machine_arn)
1✔
681
        if state_machine is None:
1✔
682
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
683
        return state_machine.describe()
1✔
684

685
    def describe_state_machine_alias(
1✔
686
        self, context: RequestContext, state_machine_alias_arn: Arn, **kwargs
687
    ) -> DescribeStateMachineAliasOutput:
688
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
689
        alias: Alias | None = self.get_store(context=context).aliases.get(state_machine_alias_arn)
1✔
690
        if alias is None:
1✔
691
            # TODO: assemble the correct exception
692
            raise ValidationException()
×
693
        description = alias.to_description()
1✔
694
        return description
1✔
695

696
    def describe_state_machine_for_execution(
1✔
697
        self,
698
        context: RequestContext,
699
        execution_arn: Arn,
700
        included_data: IncludedData = None,
701
        **kwargs,
702
    ) -> DescribeStateMachineForExecutionOutput:
703
        self._validate_state_machine_execution_arn(execution_arn)
1✔
704
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
705
        return execution.to_describe_state_machine_for_execution_output()
1✔
706

707
    def send_task_heartbeat(
1✔
708
        self, context: RequestContext, task_token: TaskToken, **kwargs
709
    ) -> SendTaskHeartbeatOutput:
710
        running_executions: list[Execution] = self._get_executions(context, ExecutionStatus.RUNNING)
1✔
711
        for execution in running_executions:
1✔
712
            try:
1✔
713
                if execution.exec_worker.env.callback_pool_manager.heartbeat(
1✔
714
                    callback_id=task_token
715
                ):
716
                    return SendTaskHeartbeatOutput()
1✔
717
            except CallbackNotifyConsumerError as consumer_error:
×
718
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
719
                    raise TaskTimedOut()
×
720
                else:
721
                    raise TaskDoesNotExist()
×
722
        raise InvalidToken()
×
723

724
    def send_task_success(
1✔
725
        self,
726
        context: RequestContext,
727
        task_token: TaskToken,
728
        output: SensitiveData,
729
        **kwargs,
730
    ) -> SendTaskSuccessOutput:
731
        outcome = CallbackOutcomeSuccess(callback_id=task_token, output=output)
1✔
732
        running_executions: list[Execution] = self._get_executions(context, ExecutionStatus.RUNNING)
1✔
733
        for execution in running_executions:
1✔
734
            try:
1✔
735
                if execution.exec_worker.env.callback_pool_manager.notify(
1✔
736
                    callback_id=task_token, outcome=outcome
737
                ):
738
                    return SendTaskSuccessOutput()
1✔
739
            except CallbackNotifyConsumerError as consumer_error:
×
740
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
741
                    raise TaskTimedOut()
×
742
                else:
743
                    raise TaskDoesNotExist()
×
744
        raise InvalidToken("Invalid token")
1✔
745

746
    def send_task_failure(
1✔
747
        self,
748
        context: RequestContext,
749
        task_token: TaskToken,
750
        error: SensitiveError = None,
751
        cause: SensitiveCause = None,
752
        **kwargs,
753
    ) -> SendTaskFailureOutput:
754
        outcome = CallbackOutcomeFailure(callback_id=task_token, error=error, cause=cause)
1✔
755
        store = self.get_store(context)
1✔
756
        for execution in store.executions.values():
1✔
757
            try:
1✔
758
                if execution.exec_worker.env.callback_pool_manager.notify(
1✔
759
                    callback_id=task_token, outcome=outcome
760
                ):
761
                    return SendTaskFailureOutput()
1✔
762
            except CallbackNotifyConsumerError as consumer_error:
×
763
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
764
                    raise TaskTimedOut()
×
765
                else:
766
                    raise TaskDoesNotExist()
×
767
        raise InvalidToken("Invalid token")
1✔
768

769
    @staticmethod
1✔
770
    def _get_state_machine_arn(state_machine_arn: str) -> str:
1✔
771
        """Extract the state machine ARN by removing the test case suffix."""
772
        return state_machine_arn.split("#")[0]
1✔
773

774
    @staticmethod
1✔
775
    def _get_mock_test_case(state_machine_arn: str, state_machine_name: str) -> MockTestCase | None:
1✔
776
        """Extract and load a mock test case from a state machine ARN if present."""
777
        parts = state_machine_arn.split("#")
1✔
778
        if len(parts) != 2:
1✔
779
            return None
1✔
780

781
        mock_test_case_name = parts[1]
1✔
782
        mock_test_case = load_mock_test_case_for(
1✔
783
            state_machine_name=state_machine_name, test_case_name=mock_test_case_name
784
        )
785
        if mock_test_case is None:
1✔
786
            raise InvalidName(
×
787
                f"Invalid mock test case name '{mock_test_case_name}' "
788
                f"for state machine '{state_machine_name}'."
789
                "Either the test case is not defined or the mock configuration file "
790
                "could not be loaded. See logs for details."
791
            )
792
        return mock_test_case
1✔
793

794
    def start_execution(
1✔
795
        self,
796
        context: RequestContext,
797
        state_machine_arn: Arn,
798
        name: Name = None,
799
        input: SensitiveData = None,
800
        trace_header: TraceHeader = None,
801
        **kwargs,
802
    ) -> StartExecutionOutput:
803
        self._validate_state_machine_arn(state_machine_arn)
1✔
804

805
        base_arn = self._get_state_machine_arn(state_machine_arn)
1✔
806
        store = self.get_store(context=context)
1✔
807

808
        alias: Alias | None = store.aliases.get(base_arn)
1✔
809
        alias_sample_state_machine_version_arn = alias.sample() if alias is not None else None
1✔
810
        unsafe_state_machine: StateMachineInstance | None = store.state_machines.get(
1✔
811
            alias_sample_state_machine_version_arn or base_arn
812
        )
813
        if not unsafe_state_machine:
1✔
814
            self._raise_state_machine_does_not_exist(base_arn)
1✔
815

816
        # Update event change parameters about the state machine and should not affect those about this execution.
817
        state_machine_clone = copy.deepcopy(unsafe_state_machine)
1✔
818

819
        if input is None:
1✔
820
            input_data = {}
1✔
821
        else:
822
            try:
1✔
823
                input_data = json.loads(input)
1✔
824
            except Exception as ex:
1✔
825
                raise InvalidExecutionInput(str(ex))  # TODO: report parsing error like AWS.
1✔
826

827
        normalised_state_machine_arn = (
1✔
828
            state_machine_clone.source_arn
829
            if isinstance(state_machine_clone, StateMachineVersion)
830
            else state_machine_clone.arn
831
        )
832
        exec_name = name or long_uid()  # TODO: validate name format
1✔
833
        if state_machine_clone.sm_type == StateMachineType.STANDARD:
1✔
834
            exec_arn = stepfunctions_standard_execution_arn(normalised_state_machine_arn, exec_name)
1✔
835
        else:
836
            # Exhaustive check on STANDARD and EXPRESS type, validated on creation.
837
            exec_arn = stepfunctions_express_execution_arn(normalised_state_machine_arn, exec_name)
1✔
838

839
        if execution := store.executions.get(exec_arn):
1✔
840
            # Return already running execution if name and input match
841
            existing_execution = self._idempotent_start_execution(
1✔
842
                execution=execution,
843
                state_machine=state_machine_clone,
844
                name=name,
845
                input_data=input_data,
846
            )
847

848
            if existing_execution:
1✔
849
                return existing_execution.to_start_output()
1✔
850

851
        # Create the execution logging session, if logging is configured.
852
        cloud_watch_logging_session = None
1✔
853
        if state_machine_clone.cloud_watch_logging_configuration is not None:
1✔
854
            cloud_watch_logging_session = CloudWatchLoggingSession(
1✔
855
                execution_arn=exec_arn,
856
                configuration=state_machine_clone.cloud_watch_logging_configuration,
857
            )
858

859
        mock_test_case = self._get_mock_test_case(state_machine_arn, state_machine_clone.name)
1✔
860

861
        execution = Execution(
1✔
862
            name=exec_name,
863
            sm_type=state_machine_clone.sm_type,
864
            role_arn=state_machine_clone.role_arn,
865
            exec_arn=exec_arn,
866
            account_id=context.account_id,
867
            region_name=context.region,
868
            state_machine=state_machine_clone,
869
            state_machine_alias_arn=alias.state_machine_alias_arn if alias is not None else None,
870
            start_date=datetime.datetime.now(tz=datetime.UTC),
871
            cloud_watch_logging_session=cloud_watch_logging_session,
872
            input_data=input_data,
873
            trace_header=trace_header,
874
            activity_store=self.get_store(context).activities,
875
            mock_test_case=mock_test_case,
876
        )
877

878
        store.executions[exec_arn] = execution
1✔
879

880
        execution.start()
1✔
881
        return execution.to_start_output()
1✔
882

883
    def start_sync_execution(
1✔
884
        self,
885
        context: RequestContext,
886
        state_machine_arn: Arn,
887
        name: Name = None,
888
        input: SensitiveData = None,
889
        trace_header: TraceHeader = None,
890
        included_data: IncludedData = None,
891
        **kwargs,
892
    ) -> StartSyncExecutionOutput:
893
        self._validate_state_machine_arn(state_machine_arn)
1✔
894

895
        base_arn = self._get_state_machine_arn(state_machine_arn)
1✔
896
        unsafe_state_machine: StateMachineInstance | None = self.get_store(
1✔
897
            context
898
        ).state_machines.get(base_arn)
899
        if not unsafe_state_machine:
1✔
900
            self._raise_state_machine_does_not_exist(base_arn)
1✔
901

902
        if unsafe_state_machine.sm_type == StateMachineType.STANDARD:
1✔
903
            self._raise_state_machine_type_not_supported()
1✔
904

905
        # Update event change parameters about the state machine and should not affect those about this execution.
906
        state_machine_clone = copy.deepcopy(unsafe_state_machine)
1✔
907

908
        if input is None:
1✔
909
            input_data = {}
×
910
        else:
911
            try:
1✔
912
                input_data = json.loads(input)
1✔
913
            except Exception as ex:
×
914
                raise InvalidExecutionInput(str(ex))  # TODO: report parsing error like AWS.
×
915

916
        normalised_state_machine_arn = (
1✔
917
            state_machine_clone.source_arn
918
            if isinstance(state_machine_clone, StateMachineVersion)
919
            else state_machine_clone.arn
920
        )
921
        exec_name = name or long_uid()  # TODO: validate name format
1✔
922
        exec_arn = stepfunctions_express_execution_arn(normalised_state_machine_arn, exec_name)
1✔
923

924
        if exec_arn in self.get_store(context).executions:
1✔
925
            raise InvalidName()  # TODO
×
926

927
        # Create the execution logging session, if logging is configured.
928
        cloud_watch_logging_session = None
1✔
929
        if state_machine_clone.cloud_watch_logging_configuration is not None:
1✔
930
            cloud_watch_logging_session = CloudWatchLoggingSession(
×
931
                execution_arn=exec_arn,
932
                configuration=state_machine_clone.cloud_watch_logging_configuration,
933
            )
934

935
        mock_test_case = self._get_mock_test_case(state_machine_arn, state_machine_clone.name)
1✔
936

937
        execution = SyncExecution(
1✔
938
            name=exec_name,
939
            sm_type=state_machine_clone.sm_type,
940
            role_arn=state_machine_clone.role_arn,
941
            exec_arn=exec_arn,
942
            account_id=context.account_id,
943
            region_name=context.region,
944
            state_machine=state_machine_clone,
945
            start_date=datetime.datetime.now(tz=datetime.UTC),
946
            cloud_watch_logging_session=cloud_watch_logging_session,
947
            input_data=input_data,
948
            trace_header=trace_header,
949
            activity_store=self.get_store(context).activities,
950
            mock_test_case=mock_test_case,
951
        )
952
        self.get_store(context).executions[exec_arn] = execution
1✔
953

954
        execution.start()
1✔
955
        return execution.to_start_sync_execution_output()
1✔
956

957
    def describe_execution(
1✔
958
        self,
959
        context: RequestContext,
960
        execution_arn: Arn,
961
        included_data: IncludedData = None,
962
        **kwargs,
963
    ) -> DescribeExecutionOutput:
964
        self._validate_state_machine_execution_arn(execution_arn)
1✔
965
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
966

967
        # Action only compatible with STANDARD workflows.
968
        if execution.sm_type != StateMachineType.STANDARD:
1✔
969
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
970

971
        return execution.to_describe_output()
1✔
972

973
    @staticmethod
1✔
974
    def _list_execution_filter(
1✔
975
        ex: Execution, state_machine_arn: str, status_filter: str | None
976
    ) -> bool:
977
        state_machine_reference_arn_set = {ex.state_machine_arn, ex.state_machine_version_arn}
1✔
978
        if state_machine_arn not in state_machine_reference_arn_set:
1✔
979
            return False
1✔
980

981
        if not status_filter:
1✔
982
            return True
1✔
983
        return ex.exec_status == status_filter
1✔
984

985
    def list_executions(
1✔
986
        self,
987
        context: RequestContext,
988
        state_machine_arn: Arn = None,
989
        status_filter: ExecutionStatus = None,
990
        max_results: PageSize = None,
991
        next_token: ListExecutionsPageToken = None,
992
        map_run_arn: LongArn = None,
993
        redrive_filter: ExecutionRedriveFilter = None,
994
        **kwargs,
995
    ) -> ListExecutionsOutput:
996
        self._validate_state_machine_arn(state_machine_arn)
1✔
997
        assert_pagination_parameters_valid(
1✔
998
            max_results=max_results,
999
            next_token=next_token,
1000
            next_token_length_limit=3096,
1001
        )
1002
        max_results = normalise_max_results(max_results)
1✔
1003

1004
        state_machine = self.get_store(context).state_machines.get(state_machine_arn)
1✔
1005
        if state_machine is None:
1✔
1006
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1007

1008
        if state_machine.sm_type != StateMachineType.STANDARD:
1✔
1009
            self._raise_state_machine_type_not_supported()
1✔
1010

1011
        # TODO: add support for paging
1012

1013
        allowed_execution_status = [
1✔
1014
            ExecutionStatus.SUCCEEDED,
1015
            ExecutionStatus.TIMED_OUT,
1016
            ExecutionStatus.PENDING_REDRIVE,
1017
            ExecutionStatus.ABORTED,
1018
            ExecutionStatus.FAILED,
1019
            ExecutionStatus.RUNNING,
1020
        ]
1021

1022
        validation_errors = []
1✔
1023

1024
        if status_filter and status_filter not in allowed_execution_status:
1✔
1025
            validation_errors.append(
1✔
1026
                f"Value '{status_filter}' at 'statusFilter' failed to satisfy constraint: Member must satisfy enum value set: [{', '.join(allowed_execution_status)}]"
1027
            )
1028

1029
        if not state_machine_arn and not map_run_arn:
1✔
1030
            validation_errors.append("Must provide a StateMachine ARN or MapRun ARN")
×
1031

1032
        if validation_errors:
1✔
1033
            errors_message = "; ".join(validation_errors)
1✔
1034
            message = f"{len(validation_errors)} validation {'errors' if len(validation_errors) > 1 else 'error'} detected: {errors_message}"
1✔
1035
            raise CommonServiceException(message=message, code="ValidationException")
1✔
1036

1037
        executions: ExecutionList = [
1✔
1038
            execution.to_execution_list_item()
1039
            for execution in self.get_store(context).executions.values()
1040
            if self._list_execution_filter(
1041
                execution,
1042
                state_machine_arn=state_machine_arn,
1043
                status_filter=status_filter,
1044
            )
1045
        ]
1046

1047
        executions.sort(key=lambda item: item["startDate"], reverse=True)
1✔
1048

1049
        paginated_executions = PaginatedList(executions)
1✔
1050
        page, token_for_next_page = paginated_executions.get_page(
1✔
1051
            token_generator=lambda item: get_next_page_token_from_arn(item.get("executionArn")),
1052
            page_size=max_results,
1053
            next_token=next_token,
1054
        )
1055

1056
        return ListExecutionsOutput(executions=page, nextToken=token_for_next_page)
1✔
1057

1058
    def list_state_machines(
1✔
1059
        self,
1060
        context: RequestContext,
1061
        max_results: PageSize = None,
1062
        next_token: PageToken = None,
1063
        **kwargs,
1064
    ) -> ListStateMachinesOutput:
1065
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1066
        max_results = normalise_max_results(max_results)
1✔
1067

1068
        state_machines: StateMachineList = [
1✔
1069
            sm.itemise()
1070
            for sm in self.get_store(context).state_machines.values()
1071
            if isinstance(sm, StateMachineRevision)
1072
        ]
1073
        state_machines.sort(key=lambda item: item["name"])
1✔
1074

1075
        paginated_state_machines = PaginatedList(state_machines)
1✔
1076
        page, token_for_next_page = paginated_state_machines.get_page(
1✔
1077
            token_generator=lambda item: get_next_page_token_from_arn(item.get("stateMachineArn")),
1078
            page_size=max_results,
1079
            next_token=next_token,
1080
        )
1081

1082
        return ListStateMachinesOutput(stateMachines=page, nextToken=token_for_next_page)
1✔
1083

1084
    def list_state_machine_aliases(
1✔
1085
        self,
1086
        context: RequestContext,
1087
        state_machine_arn: Arn,
1088
        next_token: PageToken = None,
1089
        max_results: PageSize = None,
1090
        **kwargs,
1091
    ) -> ListStateMachineAliasesOutput:
1092
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1093

1094
        self._validate_state_machine_arn(state_machine_arn)
1✔
1095
        state_machines = self.get_store(context).state_machines
1✔
1096
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1097
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
1098
            raise InvalidArn(f"Invalid arn: {state_machine_arn}")
×
1099

1100
        state_machine_aliases: StateMachineAliasList = []
1✔
1101
        valid_token_found = next_token is None
1✔
1102

1103
        for alias in state_machine_revision.aliases:
1✔
1104
            state_machine_aliases.append(alias.to_item())
1✔
1105
            if alias.tokenized_state_machine_alias_arn == next_token:
1✔
1106
                valid_token_found = True
1✔
1107

1108
        if not valid_token_found:
1✔
1109
            raise InvalidToken("Invalid Token: 'Invalid token'")
1✔
1110

1111
        state_machine_aliases.sort(key=lambda item: item["creationDate"])
1✔
1112

1113
        paginated_list = PaginatedList(state_machine_aliases)
1✔
1114

1115
        paginated_aliases, next_token = paginated_list.get_page(
1✔
1116
            token_generator=lambda item: get_next_page_token_from_arn(
1117
                item.get("stateMachineAliasArn")
1118
            ),
1119
            next_token=next_token,
1120
            page_size=100 if max_results == 0 or max_results is None else max_results,
1121
        )
1122

1123
        return ListStateMachineAliasesOutput(
1✔
1124
            stateMachineAliases=paginated_aliases, nextToken=next_token
1125
        )
1126

1127
    def list_state_machine_versions(
1✔
1128
        self,
1129
        context: RequestContext,
1130
        state_machine_arn: Arn,
1131
        next_token: PageToken = None,
1132
        max_results: PageSize = None,
1133
        **kwargs,
1134
    ) -> ListStateMachineVersionsOutput:
1135
        self._validate_state_machine_arn(state_machine_arn)
1✔
1136
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1137
        max_results = normalise_max_results(max_results)
1✔
1138

1139
        state_machines = self.get_store(context).state_machines
1✔
1140
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1141
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
1142
            raise InvalidArn(f"Invalid arn: {state_machine_arn}")
×
1143

1144
        state_machine_version_items = []
1✔
1145
        for version_arn in state_machine_revision.versions.values():
1✔
1146
            state_machine_version = state_machines[version_arn]
1✔
1147
            if isinstance(state_machine_version, StateMachineVersion):
1✔
1148
                state_machine_version_items.append(state_machine_version.itemise())
1✔
1149
            else:
1150
                raise RuntimeError(
×
1151
                    f"Expected {version_arn} to be a StateMachine Version, but got '{type(state_machine_version)}'."
1152
                )
1153

1154
        state_machine_version_items.sort(key=lambda item: item["creationDate"], reverse=True)
1✔
1155

1156
        paginated_state_machine_versions = PaginatedList(state_machine_version_items)
1✔
1157
        page, token_for_next_page = paginated_state_machine_versions.get_page(
1✔
1158
            token_generator=lambda item: get_next_page_token_from_arn(
1159
                item.get("stateMachineVersionArn")
1160
            ),
1161
            page_size=max_results,
1162
            next_token=next_token,
1163
        )
1164

1165
        return ListStateMachineVersionsOutput(
1✔
1166
            stateMachineVersions=page, nextToken=token_for_next_page
1167
        )
1168

1169
    def get_execution_history(
1✔
1170
        self,
1171
        context: RequestContext,
1172
        execution_arn: Arn,
1173
        max_results: PageSize = None,
1174
        reverse_order: ReverseOrder = None,
1175
        next_token: PageToken = None,
1176
        include_execution_data: IncludeExecutionDataGetExecutionHistory = None,
1177
        **kwargs,
1178
    ) -> GetExecutionHistoryOutput:
1179
        # TODO: add support for paging, ordering, and other manipulations.
1180
        self._validate_state_machine_execution_arn(execution_arn)
1✔
1181
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1182

1183
        # Action only compatible with STANDARD workflows.
1184
        if execution.sm_type != StateMachineType.STANDARD:
1✔
1185
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
1186

1187
        history: GetExecutionHistoryOutput = execution.to_history_output()
1✔
1188
        if reverse_order:
1✔
1189
            history["events"].reverse()
1✔
1190
        return history
1✔
1191

1192
    def delete_state_machine(
1✔
1193
        self, context: RequestContext, state_machine_arn: Arn, **kwargs
1194
    ) -> DeleteStateMachineOutput:
1195
        # TODO: halt executions?
1196
        self._validate_state_machine_arn(state_machine_arn)
1✔
1197
        state_machines = self.get_store(context).state_machines
1✔
1198
        state_machine = state_machines.get(state_machine_arn)
1✔
1199
        if isinstance(state_machine, StateMachineRevision):
1✔
1200
            state_machines.pop(state_machine_arn)
1✔
1201
            for version_arn in state_machine.versions.values():
1✔
1202
                state_machines.pop(version_arn, None)
1✔
1203
        return DeleteStateMachineOutput()
1✔
1204

1205
    def delete_state_machine_alias(
1✔
1206
        self, context: RequestContext, state_machine_alias_arn: Arn, **kwargs
1207
    ) -> DeleteStateMachineAliasOutput:
1208
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
1209
        store = self.get_store(context=context)
1✔
1210
        aliases = store.aliases
1✔
1211
        if (alias := aliases.pop(state_machine_alias_arn, None)) is not None:
1✔
1212
            state_machines = store.state_machines
1✔
1213
            for routing_configuration in alias.get_routing_configuration_list():
1✔
1214
                state_machine_version_arn = routing_configuration["stateMachineVersionArn"]
1✔
1215
                if (
1✔
1216
                    state_machine_version := state_machines.get(state_machine_version_arn)
1217
                ) is None or not isinstance(state_machine_version, StateMachineVersion):
1218
                    continue
1✔
1219
                if (
1✔
1220
                    state_machine_revision := state_machines.get(state_machine_version.source_arn)
1221
                ) is None or not isinstance(state_machine_revision, StateMachineRevision):
1222
                    continue
×
1223
                state_machine_revision.aliases.discard(alias)
1✔
1224
        return DeleteStateMachineOutput()
1✔
1225

1226
    def delete_state_machine_version(
1✔
1227
        self, context: RequestContext, state_machine_version_arn: LongArn, **kwargs
1228
    ) -> DeleteStateMachineVersionOutput:
1229
        self._validate_state_machine_arn(state_machine_version_arn)
1✔
1230
        state_machines = self.get_store(context).state_machines
1✔
1231

1232
        if not (
1✔
1233
            state_machine_version := state_machines.get(state_machine_version_arn)
1234
        ) or not isinstance(state_machine_version, StateMachineVersion):
1235
            return DeleteStateMachineVersionOutput()
1✔
1236

1237
        if (
1✔
1238
            state_machine_revision := state_machines.get(state_machine_version.source_arn)
1239
        ) and isinstance(state_machine_revision, StateMachineRevision):
1240
            referencing_alias_names: list[str] = []
1✔
1241
            for alias in state_machine_revision.aliases:
1✔
1242
                if alias.is_router_for(state_machine_version_arn=state_machine_version_arn):
1✔
1243
                    referencing_alias_names.append(alias.name)
1✔
1244
            if referencing_alias_names:
1✔
1245
                referencing_alias_names_list_body = ", ".join(referencing_alias_names)
1✔
1246
                raise ConflictException(
1✔
1247
                    "Version to be deleted must not be referenced by an alias. "
1248
                    f"Current list of aliases referencing this version: [{referencing_alias_names_list_body}]"
1249
                )
1250
            state_machine_revision.delete_version(state_machine_version_arn)
1✔
1251

1252
        state_machines.pop(state_machine_version.arn, None)
1✔
1253
        return DeleteStateMachineVersionOutput()
1✔
1254

1255
    def stop_execution(
1✔
1256
        self,
1257
        context: RequestContext,
1258
        execution_arn: Arn,
1259
        error: SensitiveError = None,
1260
        cause: SensitiveCause = None,
1261
        **kwargs,
1262
    ) -> StopExecutionOutput:
1263
        self._validate_state_machine_execution_arn(execution_arn)
1✔
1264
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1265

1266
        # Action only compatible with STANDARD workflows.
1267
        if execution.sm_type != StateMachineType.STANDARD:
1✔
1268
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
1269

1270
        stop_date = datetime.datetime.now(tz=datetime.UTC)
1✔
1271
        execution.stop(stop_date=stop_date, cause=cause, error=error)
1✔
1272
        return StopExecutionOutput(stopDate=stop_date)
1✔
1273

1274
    def update_state_machine(
1✔
1275
        self,
1276
        context: RequestContext,
1277
        state_machine_arn: Arn,
1278
        definition: Definition = None,
1279
        role_arn: Arn = None,
1280
        logging_configuration: LoggingConfiguration = None,
1281
        tracing_configuration: TracingConfiguration = None,
1282
        publish: Publish = None,
1283
        version_description: VersionDescription = None,
1284
        encryption_configuration: EncryptionConfiguration = None,
1285
        **kwargs,
1286
    ) -> UpdateStateMachineOutput:
1287
        self._validate_state_machine_arn(state_machine_arn)
1✔
1288
        state_machines = self.get_store(context).state_machines
1✔
1289

1290
        state_machine = state_machines.get(state_machine_arn)
1✔
1291
        if not isinstance(state_machine, StateMachineRevision):
1✔
1292
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1293

1294
        # TODO: Add logic to handle metrics for when SFN definitions update
1295
        if not any([definition, role_arn, logging_configuration]):
1✔
1296
            raise MissingRequiredParameter(
1✔
1297
                "Either the definition, the role ARN, the LoggingConfiguration, "
1298
                "or the TracingConfiguration must be specified"
1299
            )
1300

1301
        if definition is not None:
1✔
1302
            self._validate_definition(definition=definition, static_analysers=[StaticAnalyser()])
1✔
1303

1304
        if logging_configuration is not None:
1✔
1305
            self._sanitise_logging_configuration(logging_configuration=logging_configuration)
1✔
1306

1307
        revision_id = state_machine.create_revision(
1✔
1308
            definition=definition,
1309
            role_arn=role_arn,
1310
            logging_configuration=logging_configuration,
1311
        )
1312

1313
        version_arn = None
1✔
1314
        if publish:
1✔
1315
            version = state_machine.create_version(description=version_description)
1✔
1316
            if version is not None:
1✔
1317
                version_arn = version.arn
1✔
1318
                state_machines[version_arn] = version
1✔
1319
            else:
1320
                target_revision_id = revision_id or state_machine.revision_id
1✔
1321
                version_arn = state_machine.versions[target_revision_id]
1✔
1322

1323
        update_output = UpdateStateMachineOutput(updateDate=datetime.datetime.now(tz=datetime.UTC))
1✔
1324
        if revision_id is not None:
1✔
1325
            update_output["revisionId"] = revision_id
1✔
1326
        if version_arn is not None:
1✔
1327
            update_output["stateMachineVersionArn"] = version_arn
1✔
1328
        return update_output
1✔
1329

1330
    def update_state_machine_alias(
1✔
1331
        self,
1332
        context: RequestContext,
1333
        state_machine_alias_arn: Arn,
1334
        description: AliasDescription = None,
1335
        routing_configuration: RoutingConfigurationList = None,
1336
        **kwargs,
1337
    ) -> UpdateStateMachineAliasOutput:
1338
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
1339
        if not any([description, routing_configuration]):
1✔
1340
            raise MissingRequiredParameter(
×
1341
                "Either the description or the RoutingConfiguration must be specified"
1342
            )
1343
        if routing_configuration is not None:
1✔
1344
            self._validate_state_machine_alias_routing_configuration(
1✔
1345
                context=context, routing_configuration_list=routing_configuration
1346
            )
1347
        store = self.get_store(context=context)
1✔
1348
        alias = store.aliases.get(state_machine_alias_arn)
1✔
1349
        if alias is None:
1✔
1350
            raise ResourceNotFound("Request references a resource that does not exist.")
1✔
1351

1352
        alias.update(description=description, routing_configuration_list=routing_configuration)
1✔
1353
        return UpdateStateMachineAliasOutput(updateDate=alias.update_date)
1✔
1354

1355
    def publish_state_machine_version(
1✔
1356
        self,
1357
        context: RequestContext,
1358
        state_machine_arn: Arn,
1359
        revision_id: RevisionId = None,
1360
        description: VersionDescription = None,
1361
        **kwargs,
1362
    ) -> PublishStateMachineVersionOutput:
1363
        self._validate_state_machine_arn(state_machine_arn)
1✔
1364
        state_machines = self.get_store(context).state_machines
1✔
1365

1366
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1367
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
1368
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1369

1370
        if revision_id is not None and state_machine_revision.revision_id != revision_id:
1✔
1371
            raise ConflictException(
1✔
1372
                f"Failed to publish the State Machine version for revision {revision_id}. "
1373
                f"The current State Machine revision is {state_machine_revision.revision_id}."
1374
            )
1375

1376
        state_machine_version = state_machine_revision.create_version(description=description)
1✔
1377
        if state_machine_version is not None:
1✔
1378
            state_machines[state_machine_version.arn] = state_machine_version
1✔
1379
        else:
1380
            target_revision_id = revision_id or state_machine_revision.revision_id
1✔
1381
            state_machine_version_arn = state_machine_revision.versions.get(target_revision_id)
1✔
1382
            state_machine_version = state_machines[state_machine_version_arn]
1✔
1383

1384
        return PublishStateMachineVersionOutput(
1✔
1385
            creationDate=state_machine_version.create_date,
1386
            stateMachineVersionArn=state_machine_version.arn,
1387
        )
1388

1389
    def tag_resource(
1✔
1390
        self, context: RequestContext, resource_arn: Arn, tags: TagList, **kwargs
1391
    ) -> TagResourceOutput:
1392
        # TODO: add tagging for activities.
1393
        state_machines = self.get_store(context).state_machines
1✔
1394
        state_machine = state_machines.get(resource_arn)
1✔
1395
        if not isinstance(state_machine, StateMachineRevision):
1✔
1396
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
1✔
1397

1398
        state_machine.tag_manager.add_all(tags)
1✔
1399
        return TagResourceOutput()
1✔
1400

1401
    def untag_resource(
1✔
1402
        self, context: RequestContext, resource_arn: Arn, tag_keys: TagKeyList, **kwargs
1403
    ) -> UntagResourceOutput:
1404
        # TODO: add untagging for activities.
1405
        state_machines = self.get_store(context).state_machines
1✔
1406
        state_machine = state_machines.get(resource_arn)
1✔
1407
        if not isinstance(state_machine, StateMachineRevision):
1✔
1408
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
×
1409

1410
        state_machine.tag_manager.remove_all(tag_keys)
1✔
1411
        return UntagResourceOutput()
1✔
1412

1413
    def list_tags_for_resource(
1✔
1414
        self, context: RequestContext, resource_arn: Arn, **kwargs
1415
    ) -> ListTagsForResourceOutput:
1416
        # TODO: add untagging for activities.
1417
        state_machines = self.get_store(context).state_machines
1✔
1418
        state_machine = state_machines.get(resource_arn)
1✔
1419
        if not isinstance(state_machine, StateMachineRevision):
1✔
1420
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
×
1421

1422
        tags: TagList = state_machine.tag_manager.to_tag_list()
1✔
1423
        return ListTagsForResourceOutput(tags=tags)
1✔
1424

1425
    def describe_map_run(
1✔
1426
        self, context: RequestContext, map_run_arn: LongArn, **kwargs
1427
    ) -> DescribeMapRunOutput:
1428
        store = self.get_store(context)
1✔
1429
        for execution in store.executions.values():
1✔
1430
            map_run_record: MapRunRecord | None = (
1✔
1431
                execution.exec_worker.env.map_run_record_pool_manager.get(map_run_arn)
1432
            )
1433
            if map_run_record is not None:
1✔
1434
                return map_run_record.describe()
1✔
1435
        raise ResourceNotFound()
×
1436

1437
    def list_map_runs(
1✔
1438
        self,
1439
        context: RequestContext,
1440
        execution_arn: Arn,
1441
        max_results: PageSize = None,
1442
        next_token: PageToken = None,
1443
        **kwargs,
1444
    ) -> ListMapRunsOutput:
1445
        # TODO: add support for paging.
1446
        execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1447
        map_run_records: list[MapRunRecord] = (
1✔
1448
            execution.exec_worker.env.map_run_record_pool_manager.get_all()
1449
        )
1450
        return ListMapRunsOutput(
1✔
1451
            mapRuns=[map_run_record.list_item() for map_run_record in map_run_records]
1452
        )
1453

1454
    def update_map_run(
1✔
1455
        self,
1456
        context: RequestContext,
1457
        map_run_arn: LongArn,
1458
        max_concurrency: MaxConcurrency = None,
1459
        tolerated_failure_percentage: ToleratedFailurePercentage = None,
1460
        tolerated_failure_count: ToleratedFailureCount = None,
1461
        **kwargs,
1462
    ) -> UpdateMapRunOutput:
1463
        if tolerated_failure_percentage is not None or tolerated_failure_count is not None:
×
1464
            raise NotImplementedError(
1465
                "Updating of ToleratedFailureCount and ToleratedFailurePercentage is currently unsupported."
1466
            )
1467
        # TODO: investigate behaviour of empty requests.
1468
        store = self.get_store(context)
×
1469
        for execution in store.executions.values():
×
1470
            map_run_record: MapRunRecord | None = (
×
1471
                execution.exec_worker.env.map_run_record_pool_manager.get(map_run_arn)
1472
            )
1473
            if map_run_record is not None:
×
1474
                map_run_record.update(
×
1475
                    max_concurrency=max_concurrency,
1476
                    tolerated_failure_count=tolerated_failure_count,
1477
                    tolerated_failure_percentage=tolerated_failure_percentage,
1478
                )
1479
                LOG.warning(
×
1480
                    "StepFunctions UpdateMapRun changes are currently not being reflected in the MapRun instances."
1481
                )
1482
                return UpdateMapRunOutput()
×
1483
        raise ResourceNotFound()
×
1484

1485
    def test_state(
1✔
1486
        self, context: RequestContext, request: TestStateInput, **kwargs
1487
    ) -> TestStateOutput:
1488
        StepFunctionsProvider._validate_definition(
1✔
1489
            definition=request["definition"], static_analysers=[TestStateStaticAnalyser()]
1490
        )
1491

1492
        name: Name | None = f"TestState-{short_uid()}"
1✔
1493
        arn = stepfunctions_state_machine_arn(
1✔
1494
            name=name, account_id=context.account_id, region_name=context.region
1495
        )
1496
        state_machine = TestStateMachine(
1✔
1497
            name=name,
1498
            arn=arn,
1499
            role_arn=request["roleArn"],
1500
            definition=request["definition"],
1501
        )
1502
        exec_arn = stepfunctions_standard_execution_arn(state_machine.arn, name)
1✔
1503

1504
        input_json = json.loads(request["input"])
1✔
1505
        execution = TestStateExecution(
1✔
1506
            name=name,
1507
            role_arn=request["roleArn"],
1508
            exec_arn=exec_arn,
1509
            account_id=context.account_id,
1510
            region_name=context.region,
1511
            state_machine=state_machine,
1512
            start_date=datetime.datetime.now(tz=datetime.UTC),
1513
            input_data=input_json,
1514
            activity_store=self.get_store(context).activities,
1515
        )
1516
        execution.start()
1✔
1517

1518
        test_state_output = execution.to_test_state_output(
1✔
1519
            inspection_level=request.get("inspectionLevel", InspectionLevel.INFO)
1520
        )
1521

1522
        return test_state_output
1✔
1523

1524
    def create_activity(
1✔
1525
        self,
1526
        context: RequestContext,
1527
        name: Name,
1528
        tags: TagList = None,
1529
        encryption_configuration: EncryptionConfiguration = None,
1530
        **kwargs,
1531
    ) -> CreateActivityOutput:
1532
        self._validate_activity_name(name=name)
1✔
1533

1534
        activity_arn = stepfunctions_activity_arn(
1✔
1535
            name=name, account_id=context.account_id, region_name=context.region
1536
        )
1537
        activities = self.get_store(context).activities
1✔
1538
        if activity_arn not in activities:
1✔
1539
            activity = Activity(arn=activity_arn, name=name)
1✔
1540
            activities[activity_arn] = activity
1✔
1541
        else:
1542
            activity = activities[activity_arn]
1✔
1543

1544
        return CreateActivityOutput(activityArn=activity.arn, creationDate=activity.creation_date)
1✔
1545

1546
    def delete_activity(
1✔
1547
        self, context: RequestContext, activity_arn: Arn, **kwargs
1548
    ) -> DeleteActivityOutput:
1549
        self._validate_activity_arn(activity_arn)
1✔
1550
        self.get_store(context).activities.pop(activity_arn, None)
1✔
1551
        return DeleteActivityOutput()
1✔
1552

1553
    def describe_activity(
1✔
1554
        self, context: RequestContext, activity_arn: Arn, **kwargs
1555
    ) -> DescribeActivityOutput:
1556
        self._validate_activity_arn(activity_arn)
1✔
1557
        activity = self._get_activity(context=context, activity_arn=activity_arn)
1✔
1558
        return activity.to_describe_activity_output()
1✔
1559

1560
    def list_activities(
1✔
1561
        self,
1562
        context: RequestContext,
1563
        max_results: PageSize = None,
1564
        next_token: PageToken = None,
1565
        **kwargs,
1566
    ) -> ListActivitiesOutput:
1567
        activities: list[Activity] = list(self.get_store(context).activities.values())
1✔
1568
        return ListActivitiesOutput(
1✔
1569
            activities=[activity.to_activity_list_item() for activity in activities]
1570
        )
1571

1572
    def _send_activity_task_started(
1✔
1573
        self,
1574
        context: RequestContext,
1575
        task_token: TaskToken,
1576
        worker_name: Name | None,
1577
    ) -> None:
1578
        executions: list[Execution] = self._get_executions(context)
1✔
1579
        for execution in executions:
1✔
1580
            callback_endpoint = execution.exec_worker.env.callback_pool_manager.get(
1✔
1581
                callback_id=task_token
1582
            )
1583
            if isinstance(callback_endpoint, ActivityCallbackEndpoint):
1✔
1584
                callback_endpoint.notify_activity_task_start(worker_name=worker_name)
1✔
1585
                return
1✔
UNCOV
1586
        raise InvalidToken()
×
1587

1588
    @staticmethod
1✔
1589
    def _pull_activity_task(activity: Activity) -> ActivityTask | None:
1✔
1590
        seconds_left = 60
1✔
1591
        while seconds_left > 0:
1✔
1592
            try:
1✔
1593
                return activity.get_task()
1✔
1594
            except IndexError:
1✔
1595
                time.sleep(1)
1✔
1596
                seconds_left -= 1
1✔
UNCOV
1597
        return None
×
1598

1599
    def get_activity_task(
1✔
1600
        self,
1601
        context: RequestContext,
1602
        activity_arn: Arn,
1603
        worker_name: Name = None,
1604
        **kwargs,
1605
    ) -> GetActivityTaskOutput:
1606
        self._validate_activity_arn(activity_arn)
1✔
1607

1608
        activity = self._get_activity(context=context, activity_arn=activity_arn)
1✔
1609
        maybe_task: ActivityTask | None = self._pull_activity_task(activity=activity)
1✔
1610
        if maybe_task is not None:
1✔
1611
            self._send_activity_task_started(
1✔
1612
                context, maybe_task.task_token, worker_name=worker_name
1613
            )
1614
            return GetActivityTaskOutput(
1✔
1615
                taskToken=maybe_task.task_token, input=maybe_task.task_input
1616
            )
1617

UNCOV
1618
        return GetActivityTaskOutput(taskToken=None, input=None)
×
1619

1620
    def validate_state_machine_definition(
1✔
1621
        self, context: RequestContext, request: ValidateStateMachineDefinitionInput, **kwargs
1622
    ) -> ValidateStateMachineDefinitionOutput:
1623
        # TODO: increase parity of static analysers, current implementation is an unblocker for this API action.
1624
        # TODO: add support for ValidateStateMachineDefinitionSeverity
1625
        # TODO: add support for ValidateStateMachineDefinitionMaxResult
1626

1627
        state_machine_type: StateMachineType = request.get("type", StateMachineType.STANDARD)
1✔
1628
        definition: str = request["definition"]
1✔
1629

1630
        static_analysers = []
1✔
1631
        if state_machine_type == StateMachineType.STANDARD:
1✔
1632
            static_analysers.append(StaticAnalyser())
1✔
1633
        else:
1634
            static_analysers.append(ExpressStaticAnalyser())
1✔
1635

1636
        diagnostics: ValidateStateMachineDefinitionDiagnosticList = []
1✔
1637
        try:
1✔
1638
            StepFunctionsProvider._validate_definition(
1✔
1639
                definition=definition, static_analysers=static_analysers
1640
            )
1641
            validation_result = ValidateStateMachineDefinitionResultCode.OK
1✔
1642
        except InvalidDefinition as invalid_definition:
1✔
1643
            validation_result = ValidateStateMachineDefinitionResultCode.FAIL
1✔
1644
            diagnostics.append(
1✔
1645
                ValidateStateMachineDefinitionDiagnostic(
1646
                    severity=ValidateStateMachineDefinitionSeverity.ERROR,
1647
                    code="SCHEMA_VALIDATION_FAILED",
1648
                    message=invalid_definition.message,
1649
                )
1650
            )
UNCOV
1651
        except Exception as ex:
×
UNCOV
1652
            validation_result = ValidateStateMachineDefinitionResultCode.FAIL
×
UNCOV
1653
            LOG.error("Unknown error during validation %s", ex)
×
1654

1655
        return ValidateStateMachineDefinitionOutput(
1✔
1656
            result=validation_result, diagnostics=diagnostics, truncated=False
1657
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc