• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22700205643

04 Mar 2026 04:24PM UTC coverage: 86.938% (-0.01%) from 86.951%
22700205643

push

github

web-flow
Lambda: fix attribute exceptions (#13863)

3 of 4 new or added lines in 1 file covered. (75.0%)

78 existing lines in 5 files now uncovered.

69850 of 80345 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.77
/localstack-core/localstack/services/stepfunctions/provider.py
1
import copy
1✔
2
import datetime
1✔
3
import json
1✔
4
import logging
1✔
5
import re
1✔
6
import time
1✔
7
from typing import Final
1✔
8

9
from localstack.aws.api import CommonServiceException, RequestContext
1✔
10
from localstack.aws.api.stepfunctions import (
1✔
11
    ActivityDoesNotExist,
12
    AliasDescription,
13
    Arn,
14
    CharacterRestrictedName,
15
    ConflictException,
16
    CreateActivityOutput,
17
    CreateStateMachineAliasOutput,
18
    CreateStateMachineInput,
19
    CreateStateMachineOutput,
20
    Definition,
21
    DeleteActivityOutput,
22
    DeleteStateMachineAliasOutput,
23
    DeleteStateMachineOutput,
24
    DeleteStateMachineVersionOutput,
25
    DescribeActivityOutput,
26
    DescribeExecutionOutput,
27
    DescribeMapRunOutput,
28
    DescribeStateMachineAliasOutput,
29
    DescribeStateMachineForExecutionOutput,
30
    DescribeStateMachineOutput,
31
    EncryptionConfiguration,
32
    ExecutionDoesNotExist,
33
    ExecutionList,
34
    ExecutionRedriveFilter,
35
    ExecutionStatus,
36
    GetActivityTaskOutput,
37
    GetExecutionHistoryOutput,
38
    IncludedData,
39
    IncludeExecutionDataGetExecutionHistory,
40
    InspectionLevel,
41
    InvalidArn,
42
    InvalidDefinition,
43
    InvalidExecutionInput,
44
    InvalidLoggingConfiguration,
45
    InvalidName,
46
    InvalidToken,
47
    ListActivitiesOutput,
48
    ListExecutionsOutput,
49
    ListExecutionsPageToken,
50
    ListMapRunsOutput,
51
    ListStateMachineAliasesOutput,
52
    ListStateMachinesOutput,
53
    ListStateMachineVersionsOutput,
54
    ListTagsForResourceOutput,
55
    LoggingConfiguration,
56
    LogLevel,
57
    LongArn,
58
    MaxConcurrency,
59
    MissingRequiredParameter,
60
    Name,
61
    PageSize,
62
    PageToken,
63
    Publish,
64
    PublishStateMachineVersionOutput,
65
    ResourceNotFound,
66
    ReverseOrder,
67
    RevisionId,
68
    RoutingConfigurationList,
69
    SendTaskFailureOutput,
70
    SendTaskHeartbeatOutput,
71
    SendTaskSuccessOutput,
72
    SensitiveCause,
73
    SensitiveData,
74
    SensitiveError,
75
    StartExecutionOutput,
76
    StartSyncExecutionOutput,
77
    StateMachineAliasList,
78
    StateMachineAlreadyExists,
79
    StateMachineDoesNotExist,
80
    StateMachineList,
81
    StateMachineType,
82
    StateMachineTypeNotSupported,
83
    StepfunctionsApi,
84
    StopExecutionOutput,
85
    TagKeyList,
86
    TagList,
87
    TagResourceOutput,
88
    TaskDoesNotExist,
89
    TaskTimedOut,
90
    TaskToken,
91
    TestStateInput,
92
    TestStateOutput,
93
    ToleratedFailureCount,
94
    ToleratedFailurePercentage,
95
    TraceHeader,
96
    TracingConfiguration,
97
    UntagResourceOutput,
98
    UpdateMapRunOutput,
99
    UpdateStateMachineAliasOutput,
100
    UpdateStateMachineOutput,
101
    ValidateStateMachineDefinitionDiagnostic,
102
    ValidateStateMachineDefinitionDiagnosticList,
103
    ValidateStateMachineDefinitionInput,
104
    ValidateStateMachineDefinitionOutput,
105
    ValidateStateMachineDefinitionResultCode,
106
    ValidateStateMachineDefinitionSeverity,
107
    ValidationException,
108
    VersionDescription,
109
)
110
from localstack.services.plugins import ServiceLifecycleHook
1✔
111
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.iteration.itemprocessor.map_run_record import (
1✔
112
    MapRunRecord,
113
)
114
from localstack.services.stepfunctions.asl.eval.callback.callback import (
1✔
115
    ActivityCallbackEndpoint,
116
    CallbackConsumerTimeout,
117
    CallbackNotifyConsumerError,
118
    CallbackOutcomeFailure,
119
    CallbackOutcomeSuccess,
120
)
121
from localstack.services.stepfunctions.asl.eval.event.logging import (
1✔
122
    CloudWatchLoggingConfiguration,
123
    CloudWatchLoggingSession,
124
)
125
from localstack.services.stepfunctions.asl.parse.asl_parser import (
1✔
126
    ASLParserException,
127
)
128
from localstack.services.stepfunctions.asl.static_analyser.express_static_analyser import (
1✔
129
    ExpressStaticAnalyser,
130
)
131
from localstack.services.stepfunctions.asl.static_analyser.static_analyser import (
1✔
132
    StaticAnalyser,
133
)
134
from localstack.services.stepfunctions.asl.static_analyser.test_state.test_state_analyser import (
1✔
135
    TestStateStaticAnalyser,
136
)
137
from localstack.services.stepfunctions.asl.static_analyser.usage_metrics_static_analyser import (
1✔
138
    UsageMetricsStaticAnalyser,
139
)
140
from localstack.services.stepfunctions.backend.activity import Activity, ActivityTask
1✔
141
from localstack.services.stepfunctions.backend.alias import Alias
1✔
142
from localstack.services.stepfunctions.backend.execution import Execution, SyncExecution
1✔
143
from localstack.services.stepfunctions.backend.state_machine import (
1✔
144
    StateMachineInstance,
145
    StateMachineRevision,
146
    StateMachineVersion,
147
    TestStateMachine,
148
)
149
from localstack.services.stepfunctions.backend.store import SFNStore, sfn_stores
1✔
150
from localstack.services.stepfunctions.backend.test_state.execution import (
1✔
151
    TestStateExecution,
152
)
153
from localstack.services.stepfunctions.backend.test_state.test_state_mock import TestStateMock
1✔
154
from localstack.services.stepfunctions.local_mocking.mock_config import (
1✔
155
    LocalMockTestCase,
156
    load_local_mock_test_case_for,
157
)
158
from localstack.services.stepfunctions.stepfunctions_utils import (
1✔
159
    assert_pagination_parameters_valid,
160
    get_next_page_token_from_arn,
161
    normalise_max_results,
162
)
163
from localstack.state import StateVisitor
1✔
164
from localstack.utils.aws import arns
1✔
165
from localstack.utils.aws.arns import (
1✔
166
    ARN_PARTITION_REGEX,
167
    stepfunctions_activity_arn,
168
    stepfunctions_express_execution_arn,
169
    stepfunctions_standard_execution_arn,
170
    stepfunctions_state_machine_arn,
171
)
172
from localstack.utils.collections import PaginatedList
1✔
173
from localstack.utils.strings import long_uid, short_uid
1✔
174

175
LOG = logging.getLogger(__name__)
1✔
176

177

178
class StepFunctionsProvider(StepfunctionsApi, ServiceLifecycleHook):
1✔
179
    _TEST_STATE_MAX_TIMEOUT_SECONDS: Final[int] = 300  # 5 minutes.
1✔
180

181
    @staticmethod
1✔
182
    def get_store(context: RequestContext) -> SFNStore:
1✔
183
        return sfn_stores[context.account_id][context.region]
1✔
184

185
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
186
        visitor.visit(sfn_stores)
×
187

188
    _STATE_MACHINE_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
189
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:stateMachine:[a-zA-Z0-9-_.]+(:\d+)?(:[a-zA-Z0-9-_.]+)*(?:#[a-zA-Z0-9-_]+)?$"
190
    )
191

192
    _STATE_MACHINE_EXECUTION_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
193
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:(stateMachine|execution|express):[a-zA-Z0-9-_.]+(:\d+)?(:[a-zA-Z0-9-_.]+)*$"
194
    )
195

196
    _ACTIVITY_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
197
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:activity:[a-zA-Z0-9-_\.]{{1,80}}$"
198
    )
199

200
    _ALIAS_ARN_REGEX: Final[re.Pattern] = re.compile(
1✔
201
        rf"{ARN_PARTITION_REGEX}:states:[a-z0-9-]+:[0-9]{{12}}:stateMachine:[A-Za-z0-9_.-]+:[A-Za-z_.-]+[A-Za-z0-9_.-]{{0,80}}$"
202
    )
203

204
    _ALIAS_NAME_REGEX: Final[re.Pattern] = re.compile(r"^(?=.*[a-zA-Z_\-\.])[a-zA-Z0-9_\-\.]+$")
1✔
205

206
    @staticmethod
1✔
207
    def _validate_state_machine_arn(state_machine_arn: str) -> None:
1✔
208
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
209
        if not StepFunctionsProvider._STATE_MACHINE_ARN_REGEX.match(state_machine_arn):
1✔
210
            raise InvalidArn(f"Invalid arn: '{state_machine_arn}'")
1✔
211

212
    @staticmethod
1✔
213
    def _raise_state_machine_does_not_exist(state_machine_arn: str) -> None:
1✔
214
        raise StateMachineDoesNotExist(f"State Machine Does Not Exist: '{state_machine_arn}'")
1✔
215

216
    @staticmethod
1✔
217
    def _validate_state_machine_execution_arn(execution_arn: str) -> None:
1✔
218
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
219
        if not StepFunctionsProvider._STATE_MACHINE_EXECUTION_ARN_REGEX.match(execution_arn):
1✔
220
            raise InvalidArn(f"Invalid arn: '{execution_arn}'")
1✔
221

222
    @staticmethod
1✔
223
    def _validate_activity_arn(activity_arn: str) -> None:
1✔
224
        # TODO: InvalidArn exception message do not communicate which part of the ARN is incorrect.
225
        if not StepFunctionsProvider._ACTIVITY_ARN_REGEX.match(activity_arn):
1✔
226
            raise InvalidArn(f"Invalid arn: '{activity_arn}'")
1✔
227

228
    @staticmethod
1✔
229
    def _validate_state_machine_alias_arn(state_machine_alias_arn: Arn) -> None:
1✔
230
        if not StepFunctionsProvider._ALIAS_ARN_REGEX.match(state_machine_alias_arn):
1✔
231
            raise InvalidArn(f"Invalid arn: '{state_machine_alias_arn}'")
×
232

233
    def _raise_state_machine_type_not_supported(self):
1✔
234
        raise StateMachineTypeNotSupported(
1✔
235
            "This operation is not supported by this type of state machine"
236
        )
237

238
    @staticmethod
1✔
239
    def _raise_resource_type_not_in_context(resource_type: str) -> None:
1✔
240
        lower_resource_type = resource_type.lower()
1✔
241
        raise InvalidArn(
1✔
242
            f"Invalid Arn: 'Resource type not valid in this context: {lower_resource_type}'"
243
        )
244

245
    @staticmethod
1✔
246
    def _validate_activity_name(name: str) -> None:
1✔
247
        # The activity name is validated according to the AWS StepFunctions documentation, the name should not contain:
248
        # - white space
249
        # - brackets < > { } [ ]
250
        # - wildcard characters ? *
251
        # - special characters " # % \ ^ | ~ ` $ & , ; : /
252
        # - control characters (U+0000-001F, U+007F-009F)
253
        # https://docs.aws.amazon.com/step-functions/latest/apireference/API_CreateActivity.html#API_CreateActivity_RequestSyntax
254
        if not (1 <= len(name) <= 80):
1✔
255
            raise InvalidName(f"Invalid Name: '{name}'")
×
256
        invalid_chars = set(' <>{}[]?*"#%\\^|~`$&,;:/')
1✔
257
        control_chars = {chr(i) for i in range(32)} | {chr(i) for i in range(127, 160)}
1✔
258
        invalid_chars |= control_chars
1✔
259
        for char in name:
1✔
260
            if char in invalid_chars:
1✔
261
                raise InvalidName(f"Invalid Name: '{name}'")
1✔
262

263
    @staticmethod
1✔
264
    def _validate_state_machine_alias_name(name: CharacterRestrictedName) -> None:
1✔
265
        len_name = len(name)
1✔
266
        if len_name > 80:
1✔
267
            raise ValidationException(
1✔
268
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: "
269
                f"Member must have length less than or equal to 80"
270
            )
271
        if not StepFunctionsProvider._ALIAS_NAME_REGEX.match(name):
1✔
272
            raise ValidationException(
1✔
273
                # TODO: explore more error cases in which more than one validation error may occur which results
274
                #  in the counter below being greater than 1.
275
                f"1 validation error detected: Value '{name}' at 'name' failed to satisfy constraint: "
276
                f"Member must satisfy regular expression pattern: ^(?=.*[a-zA-Z_\\-\\.])[a-zA-Z0-9_\\-\\.]+$"
277
            )
278

279
    def _get_execution(self, context: RequestContext, execution_arn: Arn) -> Execution:
1✔
280
        execution: Execution | None = self.get_store(context).executions.get(execution_arn)
1✔
281
        if not execution:
1✔
282
            raise ExecutionDoesNotExist(f"Execution Does Not Exist: '{execution_arn}'")
1✔
283
        return execution
1✔
284

285
    def _get_executions(
1✔
286
        self,
287
        context: RequestContext,
288
        execution_status: ExecutionStatus | None = None,
289
    ):
290
        store = self.get_store(context)
1✔
291
        execution: list[Execution] = list(store.executions.values())
1✔
292
        if execution_status:
1✔
293
            execution = list(
1✔
294
                filter(
295
                    lambda e: e.exec_status == execution_status,
296
                    store.executions.values(),
297
                )
298
            )
299
        return execution
1✔
300

301
    def _get_activity(self, context: RequestContext, activity_arn: Arn) -> Activity:
1✔
302
        maybe_activity: Activity | None = self.get_store(context).activities.get(activity_arn, None)
1✔
303
        if maybe_activity is None:
1✔
304
            raise ActivityDoesNotExist(f"Activity Does Not Exist: '{activity_arn}'")
1✔
305
        return maybe_activity
1✔
306

307
    def _idempotent_revision(
1✔
308
        self,
309
        context: RequestContext,
310
        name: str,
311
        definition: Definition,
312
        state_machine_type: StateMachineType,
313
        logging_configuration: LoggingConfiguration,
314
        tracing_configuration: TracingConfiguration,
315
    ) -> StateMachineRevision | None:
316
        # CreateStateMachine's idempotency check is based on the state machine name, definition, type,
317
        # LoggingConfiguration and TracingConfiguration.
318
        # If a following request has a different roleArn or tags, Step Functions will ignore these differences and
319
        # treat it as an idempotent request of the previous. In this case, roleArn and tags will not be updated, even
320
        # if they are different.
321
        state_machines: list[StateMachineInstance] = list(
1✔
322
            self.get_store(context).state_machines.values()
323
        )
324
        revisions = filter(lambda sm: isinstance(sm, StateMachineRevision), state_machines)
1✔
325
        for state_machine in revisions:
1✔
326
            check = all(
1✔
327
                [
328
                    state_machine.name == name,
329
                    state_machine.definition == definition,
330
                    state_machine.sm_type == state_machine_type,
331
                    state_machine.logging_config == logging_configuration,
332
                    state_machine.tracing_config == tracing_configuration,
333
                ]
334
            )
335
            if check:
1✔
336
                return state_machine
1✔
337
        return None
1✔
338

339
    def _idempotent_start_execution(
1✔
340
        self,
341
        execution: Execution | None,
342
        state_machine: StateMachineInstance,
343
        name: Name,
344
        input_data: SensitiveData,
345
    ) -> Execution | None:
346
        # StartExecution is idempotent for STANDARD workflows. For a STANDARD workflow,
347
        # if you call StartExecution with the same name and input as a running execution,
348
        # the call succeeds and return the same response as the original request.
349
        # If the execution is closed or if the input is different,
350
        # it returns a 400 ExecutionAlreadyExists error. You can reuse names after 90 days.
351

352
        if not execution:
1✔
353
            return None
×
354

355
        match (name, input_data, execution.exec_status, state_machine.sm_type):
1✔
356
            case (
1✔
357
                execution.name,
358
                execution.input_data,
359
                ExecutionStatus.RUNNING,
360
                StateMachineType.STANDARD,
361
            ):
362
                return execution
1✔
363

364
        raise CommonServiceException(
1✔
365
            code="ExecutionAlreadyExists",
366
            message=f"Execution Already Exists: '{execution.exec_arn}'",
367
        )
368

369
    def _revision_by_name(self, context: RequestContext, name: str) -> StateMachineInstance | None:
1✔
370
        state_machines: list[StateMachineInstance] = list(
1✔
371
            self.get_store(context).state_machines.values()
372
        )
373
        for state_machine in state_machines:
1✔
374
            if isinstance(state_machine, StateMachineRevision) and state_machine.name == name:
1✔
375
                return state_machine
1✔
376
        return None
1✔
377

378
    @staticmethod
1✔
379
    def _validate_definition(definition: str, static_analysers: list[StaticAnalyser]) -> None:
1✔
380
        try:
1✔
381
            for static_analyser in static_analysers:
1✔
382
                static_analyser.analyse(definition)
1✔
383
        except ASLParserException as asl_parser_exception:
1✔
384
            invalid_definition = InvalidDefinition()
1✔
385
            invalid_definition.message = repr(asl_parser_exception)
1✔
386
            raise invalid_definition
1✔
387
        except Exception as exception:
1✔
388
            exception_name = exception.__class__.__name__
1✔
389
            exception_args = list(exception.args)
1✔
390
            invalid_definition = InvalidDefinition()
1✔
391
            invalid_definition.message = (
1✔
392
                f"Error={exception_name} Args={exception_args} in definition '{definition}'."
393
            )
394
            raise invalid_definition
1✔
395

396
    @staticmethod
1✔
397
    def _sanitise_logging_configuration(
1✔
398
        logging_configuration: LoggingConfiguration,
399
    ) -> None:
400
        level = logging_configuration.get("level")
1✔
401
        destinations = logging_configuration.get("destinations")
1✔
402

403
        if destinations is not None and len(destinations) > 1:
1✔
404
            raise InvalidLoggingConfiguration(
1✔
405
                "Invalid Logging Configuration: Must specify exactly one Log Destination."
406
            )
407

408
        # A LogLevel that is not OFF, should have a destination.
409
        if level is not None and level != LogLevel.OFF and not destinations:
1✔
410
            raise InvalidLoggingConfiguration(
1✔
411
                "Invalid Logging Configuration: Must specify exactly one Log Destination."
412
            )
413

414
        # Default for level is OFF.
415
        level = level or LogLevel.OFF
1✔
416

417
        # Default for includeExecutionData is False.
418
        include_flag = logging_configuration.get("includeExecutionData", False)
1✔
419

420
        # Update configuration object.
421
        logging_configuration["level"] = level
1✔
422
        logging_configuration["includeExecutionData"] = include_flag
1✔
423

424
    def create_state_machine(
1✔
425
        self, context: RequestContext, request: CreateStateMachineInput, **kwargs
426
    ) -> CreateStateMachineOutput:
427
        if not request.get("publish", False) and request.get("versionDescription"):
1✔
428
            raise ValidationException("Version description can only be set when publish is true")
1✔
429

430
        # Extract parameters and set defaults.
431
        state_machine_name = request["name"]
1✔
432
        state_machine_role_arn = request["roleArn"]
1✔
433
        state_machine_definition = request["definition"]
1✔
434
        state_machine_type = request.get("type") or StateMachineType.STANDARD
1✔
435
        state_machine_tracing_configuration = request.get("tracingConfiguration")
1✔
436
        state_machine_tags = request.get("tags")
1✔
437
        state_machine_logging_configuration = request.get(
1✔
438
            "loggingConfiguration", LoggingConfiguration()
439
        )
440
        self._sanitise_logging_configuration(
1✔
441
            logging_configuration=state_machine_logging_configuration
442
        )
443

444
        # CreateStateMachine is an idempotent API. Subsequent requests won't create a duplicate resource if it was
445
        # already created.
446
        idem_state_machine: StateMachineRevision | None = self._idempotent_revision(
1✔
447
            context=context,
448
            name=state_machine_name,
449
            definition=state_machine_definition,
450
            state_machine_type=state_machine_type,
451
            logging_configuration=state_machine_logging_configuration,
452
            tracing_configuration=state_machine_tracing_configuration,
453
        )
454
        if idem_state_machine is not None:
1✔
455
            return CreateStateMachineOutput(
1✔
456
                stateMachineArn=idem_state_machine.arn,
457
                creationDate=idem_state_machine.create_date,
458
            )
459

460
        # Assert this state machine name is unique.
461
        state_machine_with_name: StateMachineRevision | None = self._revision_by_name(
1✔
462
            context=context, name=state_machine_name
463
        )
464
        if state_machine_with_name is not None:
1✔
465
            raise StateMachineAlreadyExists(
1✔
466
                f"State Machine Already Exists: '{state_machine_with_name.arn}'"
467
            )
468

469
        # Compute the state machine's Arn.
470
        state_machine_arn = stepfunctions_state_machine_arn(
1✔
471
            name=state_machine_name,
472
            account_id=context.account_id,
473
            region_name=context.region,
474
        )
475
        state_machines = self.get_store(context).state_machines
1✔
476

477
        # Reduce the logging configuration to a usable cloud watch representation, and validate the destinations
478
        # if any were given.
479
        cloud_watch_logging_configuration = (
1✔
480
            CloudWatchLoggingConfiguration.from_logging_configuration(
481
                state_machine_arn=state_machine_arn,
482
                logging_configuration=state_machine_logging_configuration,
483
            )
484
        )
485
        if cloud_watch_logging_configuration is not None:
1✔
486
            cloud_watch_logging_configuration.validate()
1✔
487

488
        # Run static analysers on the definition given.
489
        if state_machine_type == StateMachineType.EXPRESS:
1✔
490
            StepFunctionsProvider._validate_definition(
1✔
491
                definition=state_machine_definition,
492
                static_analysers=[ExpressStaticAnalyser()],
493
            )
494
        else:
495
            StepFunctionsProvider._validate_definition(
1✔
496
                definition=state_machine_definition, static_analysers=[StaticAnalyser()]
497
            )
498

499
        # Create the state machine and add it to the store.
500
        state_machine = StateMachineRevision(
1✔
501
            name=state_machine_name,
502
            arn=state_machine_arn,
503
            role_arn=state_machine_role_arn,
504
            definition=state_machine_definition,
505
            sm_type=state_machine_type,
506
            logging_config=state_machine_logging_configuration,
507
            cloud_watch_logging_configuration=cloud_watch_logging_configuration,
508
            tracing_config=state_machine_tracing_configuration,
509
            tags=state_machine_tags,
510
        )
511
        state_machines[state_machine_arn] = state_machine
1✔
512

513
        create_output = CreateStateMachineOutput(
1✔
514
            stateMachineArn=state_machine.arn, creationDate=state_machine.create_date
515
        )
516

517
        # Create the first version if the 'publish' flag is used.
518
        if request.get("publish", False):
1✔
519
            version_description = request.get("versionDescription")
1✔
520
            state_machine_version = state_machine.create_version(description=version_description)
1✔
521
            if state_machine_version is not None:
1✔
522
                state_machine_version_arn = state_machine_version.arn
1✔
523
                state_machines[state_machine_version_arn] = state_machine_version
1✔
524
                create_output["stateMachineVersionArn"] = state_machine_version_arn
1✔
525

526
        # Run static analyser on definition and collect usage metrics
527
        UsageMetricsStaticAnalyser.process(state_machine_definition)
1✔
528

529
        return create_output
1✔
530

531
    def _validate_state_machine_alias_routing_configuration(
1✔
532
        self, context: RequestContext, routing_configuration_list: RoutingConfigurationList
533
    ) -> None:
534
        # TODO: to match AWS's approach best validation exceptions could be
535
        #  built in a process decoupled from the provider.
536

537
        routing_configuration_list_len = len(routing_configuration_list)
1✔
538
        if not (1 <= routing_configuration_list_len <= 2):
1✔
539
            # Replicate the object string dump format:
540
            # [RoutingConfigurationListItem(stateMachineVersionArn=arn_no_quotes, weight=int), ...]
541
            routing_configuration_serialization_parts = []
1✔
542
            for routing_configuration in routing_configuration_list:
1✔
543
                routing_configuration_serialization_parts.append(
1✔
544
                    "".join(
545
                        [
546
                            "RoutingConfigurationListItem(stateMachineVersionArn=",
547
                            routing_configuration["stateMachineVersionArn"],
548
                            ", weight=",
549
                            str(routing_configuration["weight"]),
550
                            ")",
551
                        ]
552
                    )
553
                )
554
            routing_configuration_serialization_list = (
1✔
555
                f"[{', '.join(routing_configuration_serialization_parts)}]"
556
            )
557
            raise ValidationException(
1✔
558
                f"1 validation error detected: Value '{routing_configuration_serialization_list}' "
559
                "at 'routingConfiguration' failed to "
560
                "satisfy constraint: Member must have length less than or equal to 2"
561
            )
562

563
        routing_configuration_arn_list = [
1✔
564
            routing_configuration["stateMachineVersionArn"]
565
            for routing_configuration in routing_configuration_list
566
        ]
567
        if len(set(routing_configuration_arn_list)) < routing_configuration_list_len:
1✔
568
            arn_list_string = f"[{', '.join(routing_configuration_arn_list)}]"
1✔
569
            raise ValidationException(
1✔
570
                "Routing configuration must contain distinct state machine version ARNs. "
571
                f"Received: {arn_list_string}"
572
            )
573

574
        routing_weights = [
1✔
575
            routing_configuration["weight"] for routing_configuration in routing_configuration_list
576
        ]
577
        for i, weight in enumerate(routing_weights):
1✔
578
            # TODO: check for weight type.
579
            if weight < 0:
1✔
580
                raise ValidationException(
×
581
                    f"Invalid value for parameter routingConfiguration[{i + 1}].weight, value: {weight}, valid min value: 0"
582
                )
583
            if weight > 100:
1✔
584
                raise ValidationException(
1✔
585
                    f"1 validation error detected: Value '{weight}' at 'routingConfiguration.{i + 1}.member.weight' "
586
                    "failed to satisfy constraint: Member must have value less than or equal to 100"
587
                )
588
        routing_weights_sum = sum(routing_weights)
1✔
589
        if not routing_weights_sum == 100:
1✔
590
            raise ValidationException(
1✔
591
                f"Sum of routing configuration weights must equal 100. Received: {json.dumps(routing_weights)}"
592
            )
593

594
        store = self.get_store(context=context)
1✔
595
        state_machines = store.state_machines
1✔
596

597
        first_routing_qualified_arn = routing_configuration_arn_list[0]
1✔
598
        shared_state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
599
            qualified_arn=first_routing_qualified_arn
600
        )
601
        for routing_configuration_arn in routing_configuration_arn_list:
1✔
602
            maybe_state_machine_version = state_machines.get(routing_configuration_arn)
1✔
603
            if not isinstance(maybe_state_machine_version, StateMachineVersion):
1✔
604
                arn_list_string = f"[{', '.join(routing_configuration_arn_list)}]"
1✔
605
                raise ValidationException(
1✔
606
                    f"Routing configuration must contain state machine version ARNs. Received: {arn_list_string}"
607
                )
608
            state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
609
                qualified_arn=routing_configuration_arn
610
            )
611
            if state_machine_revision_arn != shared_state_machine_revision_arn:
1✔
612
                raise ValidationException("TODO")
×
613

614
    @staticmethod
1✔
615
    def _get_state_machine_arn_from_qualified_arn(qualified_arn: Arn) -> Arn:
1✔
616
        last_colon_index = qualified_arn.rfind(":")
1✔
617
        base_arn = qualified_arn[:last_colon_index]
1✔
618
        return base_arn
1✔
619

620
    def create_state_machine_alias(
1✔
621
        self,
622
        context: RequestContext,
623
        name: CharacterRestrictedName,
624
        routing_configuration: RoutingConfigurationList,
625
        description: AliasDescription = None,
626
        **kwargs,
627
    ) -> CreateStateMachineAliasOutput:
628
        # Validate the inputs.
629
        self._validate_state_machine_alias_name(name=name)
1✔
630
        self._validate_state_machine_alias_routing_configuration(
1✔
631
            context=context, routing_configuration_list=routing_configuration
632
        )
633

634
        # Determine the state machine arn this alias maps to,
635
        # do so unsafely as validation already took place before initialisation.
636
        first_routing_qualified_arn = routing_configuration[0]["stateMachineVersionArn"]
1✔
637
        state_machine_revision_arn = self._get_state_machine_arn_from_qualified_arn(
1✔
638
            qualified_arn=first_routing_qualified_arn
639
        )
640
        alias = Alias(
1✔
641
            state_machine_arn=state_machine_revision_arn,
642
            name=name,
643
            description=description,
644
            routing_configuration_list=routing_configuration,
645
        )
646
        state_machine_alias_arn = alias.state_machine_alias_arn
1✔
647

648
        store = self.get_store(context=context)
1✔
649

650
        aliases = store.aliases
1✔
651
        if maybe_idempotent_alias := aliases.get(state_machine_alias_arn):
1✔
652
            if alias.is_idempotent(maybe_idempotent_alias):
1✔
653
                return CreateStateMachineAliasOutput(
1✔
654
                    stateMachineAliasArn=state_machine_alias_arn, creationDate=alias.create_date
655
                )
656
            else:
657
                # CreateStateMachineAlias is an idempotent API. Idempotent requests won't create duplicate resources.
658
                raise ConflictException(
1✔
659
                    "Failed to create alias because an alias with the same name and a "
660
                    "different routing configuration already exists."
661
                )
662
        aliases[state_machine_alias_arn] = alias
1✔
663

664
        state_machine_revision = store.state_machines.get(state_machine_revision_arn)
1✔
665
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
666
            # The state machine was deleted but not the version referenced in this context.
667
            raise RuntimeError(f"No state machine revision for arn '{state_machine_revision_arn}'")
×
668
        state_machine_revision.aliases.add(alias)
1✔
669

670
        return CreateStateMachineAliasOutput(
1✔
671
            stateMachineAliasArn=state_machine_alias_arn, creationDate=alias.create_date
672
        )
673

674
    def describe_state_machine(
1✔
675
        self,
676
        context: RequestContext,
677
        state_machine_arn: Arn,
678
        included_data: IncludedData = None,
679
        **kwargs,
680
    ) -> DescribeStateMachineOutput:
681
        self._validate_state_machine_arn(state_machine_arn)
1✔
682
        state_machine = self.get_store(context).state_machines.get(state_machine_arn)
1✔
683
        if state_machine is None:
1✔
684
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
685
        return state_machine.describe()
1✔
686

687
    def describe_state_machine_alias(
1✔
688
        self, context: RequestContext, state_machine_alias_arn: Arn, **kwargs
689
    ) -> DescribeStateMachineAliasOutput:
690
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
691
        alias: Alias | None = self.get_store(context=context).aliases.get(state_machine_alias_arn)
1✔
692
        if alias is None:
1✔
693
            # TODO: assemble the correct exception
694
            raise ValidationException()
×
695
        description = alias.to_description()
1✔
696
        return description
1✔
697

698
    def describe_state_machine_for_execution(
1✔
699
        self,
700
        context: RequestContext,
701
        execution_arn: Arn,
702
        included_data: IncludedData = None,
703
        **kwargs,
704
    ) -> DescribeStateMachineForExecutionOutput:
705
        self._validate_state_machine_execution_arn(execution_arn)
1✔
706
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
707
        return execution.to_describe_state_machine_for_execution_output()
1✔
708

709
    def send_task_heartbeat(
1✔
710
        self, context: RequestContext, task_token: TaskToken, **kwargs
711
    ) -> SendTaskHeartbeatOutput:
712
        running_executions: list[Execution] = self._get_executions(context, ExecutionStatus.RUNNING)
1✔
713
        for execution in running_executions:
1✔
714
            try:
1✔
715
                if execution.exec_worker.env.callback_pool_manager.heartbeat(
1✔
716
                    callback_id=task_token
717
                ):
718
                    return SendTaskHeartbeatOutput()
1✔
719
            except CallbackNotifyConsumerError as consumer_error:
×
720
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
721
                    raise TaskTimedOut()
×
722
                else:
723
                    raise TaskDoesNotExist()
×
724
        raise InvalidToken()
×
725

726
    def send_task_success(
1✔
727
        self,
728
        context: RequestContext,
729
        task_token: TaskToken,
730
        output: SensitiveData,
731
        **kwargs,
732
    ) -> SendTaskSuccessOutput:
733
        outcome = CallbackOutcomeSuccess(callback_id=task_token, output=output)
1✔
734
        running_executions: list[Execution] = self._get_executions(context, ExecutionStatus.RUNNING)
1✔
735
        for execution in running_executions:
1✔
736
            try:
1✔
737
                if execution.exec_worker.env.callback_pool_manager.notify(
1✔
738
                    callback_id=task_token, outcome=outcome
739
                ):
740
                    return SendTaskSuccessOutput()
1✔
741
            except CallbackNotifyConsumerError as consumer_error:
×
742
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
743
                    raise TaskTimedOut()
×
744
                else:
745
                    raise TaskDoesNotExist()
×
746
        raise InvalidToken("Invalid token")
1✔
747

748
    def send_task_failure(
1✔
749
        self,
750
        context: RequestContext,
751
        task_token: TaskToken,
752
        error: SensitiveError = None,
753
        cause: SensitiveCause = None,
754
        **kwargs,
755
    ) -> SendTaskFailureOutput:
756
        outcome = CallbackOutcomeFailure(callback_id=task_token, error=error, cause=cause)
1✔
757
        store = self.get_store(context)
1✔
758
        for execution in store.executions.values():
1✔
759
            try:
1✔
760
                if execution.exec_worker.env.callback_pool_manager.notify(
1✔
761
                    callback_id=task_token, outcome=outcome
762
                ):
763
                    return SendTaskFailureOutput()
1✔
764
            except CallbackNotifyConsumerError as consumer_error:
×
765
                if isinstance(consumer_error, CallbackConsumerTimeout):
×
766
                    raise TaskTimedOut()
×
767
                else:
768
                    raise TaskDoesNotExist()
×
769
        raise InvalidToken("Invalid token")
1✔
770

771
    @staticmethod
1✔
772
    def _get_state_machine_arn(state_machine_arn: str) -> str:
1✔
773
        """Extract the state machine ARN by removing the test case suffix."""
774
        return state_machine_arn.split("#")[0]
1✔
775

776
    @staticmethod
1✔
777
    def _get_local_mock_test_case(
1✔
778
        state_machine_arn: str, state_machine_name: str
779
    ) -> LocalMockTestCase | None:
780
        """Extract and load a mock test case from a state machine ARN if present."""
781
        parts = state_machine_arn.split("#")
1✔
782
        if len(parts) != 2:
1✔
783
            return None
1✔
784

785
        mock_test_case_name = parts[1]
1✔
786
        mock_test_case = load_local_mock_test_case_for(
1✔
787
            state_machine_name=state_machine_name, test_case_name=mock_test_case_name
788
        )
789
        if mock_test_case is None:
1✔
790
            raise InvalidName(
×
791
                f"Invalid mock test case name '{mock_test_case_name}' "
792
                f"for state machine '{state_machine_name}'."
793
                "Either the test case is not defined or the mock configuration file "
794
                "could not be loaded. See logs for details."
795
            )
796
        return mock_test_case
1✔
797

798
    def start_execution(
1✔
799
        self,
800
        context: RequestContext,
801
        state_machine_arn: Arn,
802
        name: Name = None,
803
        input: SensitiveData = None,
804
        trace_header: TraceHeader = None,
805
        **kwargs,
806
    ) -> StartExecutionOutput:
807
        self._validate_state_machine_arn(state_machine_arn)
1✔
808

809
        base_arn = self._get_state_machine_arn(state_machine_arn)
1✔
810
        store = self.get_store(context=context)
1✔
811

812
        alias: Alias | None = store.aliases.get(base_arn)
1✔
813
        alias_sample_state_machine_version_arn = alias.sample() if alias is not None else None
1✔
814
        unsafe_state_machine: StateMachineInstance | None = store.state_machines.get(
1✔
815
            alias_sample_state_machine_version_arn or base_arn
816
        )
817
        if not unsafe_state_machine:
1✔
818
            self._raise_state_machine_does_not_exist(base_arn)
1✔
819

820
        # Update event change parameters about the state machine and should not affect those about this execution.
821
        state_machine_clone = copy.deepcopy(unsafe_state_machine)
1✔
822

823
        if input is None:
1✔
824
            input_data = {}
1✔
825
        else:
826
            try:
1✔
827
                input_data = json.loads(input)
1✔
828
            except Exception as ex:
1✔
829
                raise InvalidExecutionInput(str(ex))  # TODO: report parsing error like AWS.
1✔
830

831
        normalised_state_machine_arn = (
1✔
832
            state_machine_clone.source_arn
833
            if isinstance(state_machine_clone, StateMachineVersion)
834
            else state_machine_clone.arn
835
        )
836
        exec_name = name or long_uid()  # TODO: validate name format
1✔
837
        if state_machine_clone.sm_type == StateMachineType.STANDARD:
1✔
838
            exec_arn = stepfunctions_standard_execution_arn(normalised_state_machine_arn, exec_name)
1✔
839
        else:
840
            # Exhaustive check on STANDARD and EXPRESS type, validated on creation.
841
            exec_arn = stepfunctions_express_execution_arn(normalised_state_machine_arn, exec_name)
1✔
842

843
        if execution := store.executions.get(exec_arn):
1✔
844
            # Return already running execution if name and input match
845
            existing_execution = self._idempotent_start_execution(
1✔
846
                execution=execution,
847
                state_machine=state_machine_clone,
848
                name=name,
849
                input_data=input_data,
850
            )
851

852
            if existing_execution:
1✔
853
                return existing_execution.to_start_output()
1✔
854

855
        # Create the execution logging session, if logging is configured.
856
        cloud_watch_logging_session = None
1✔
857
        if state_machine_clone.cloud_watch_logging_configuration is not None:
1✔
858
            cloud_watch_logging_session = CloudWatchLoggingSession(
1✔
859
                execution_arn=exec_arn,
860
                configuration=state_machine_clone.cloud_watch_logging_configuration,
861
            )
862

863
        local_mock_test_case = self._get_local_mock_test_case(
1✔
864
            state_machine_arn, state_machine_clone.name
865
        )
866

867
        execution = Execution(
1✔
868
            name=exec_name,
869
            sm_type=state_machine_clone.sm_type,
870
            role_arn=state_machine_clone.role_arn,
871
            exec_arn=exec_arn,
872
            account_id=context.account_id,
873
            region_name=context.region,
874
            state_machine=state_machine_clone,
875
            state_machine_alias_arn=alias.state_machine_alias_arn if alias is not None else None,
876
            start_date=datetime.datetime.now(tz=datetime.UTC),
877
            cloud_watch_logging_session=cloud_watch_logging_session,
878
            input_data=input_data,
879
            trace_header=trace_header,
880
            activity_store=self.get_store(context).activities,
881
            local_mock_test_case=local_mock_test_case,
882
        )
883

884
        store.executions[exec_arn] = execution
1✔
885

886
        execution.start()
1✔
887
        return execution.to_start_output()
1✔
888

889
    def start_sync_execution(
1✔
890
        self,
891
        context: RequestContext,
892
        state_machine_arn: Arn,
893
        name: Name = None,
894
        input: SensitiveData = None,
895
        trace_header: TraceHeader = None,
896
        included_data: IncludedData = None,
897
        **kwargs,
898
    ) -> StartSyncExecutionOutput:
899
        self._validate_state_machine_arn(state_machine_arn)
1✔
900

901
        base_arn = self._get_state_machine_arn(state_machine_arn)
1✔
902
        unsafe_state_machine: StateMachineInstance | None = self.get_store(
1✔
903
            context
904
        ).state_machines.get(base_arn)
905
        if not unsafe_state_machine:
1✔
906
            self._raise_state_machine_does_not_exist(base_arn)
1✔
907

908
        if unsafe_state_machine.sm_type == StateMachineType.STANDARD:
1✔
909
            self._raise_state_machine_type_not_supported()
1✔
910

911
        # Update event change parameters about the state machine and should not affect those about this execution.
912
        state_machine_clone = copy.deepcopy(unsafe_state_machine)
1✔
913

914
        if input is None:
1✔
915
            input_data = {}
×
916
        else:
917
            try:
1✔
918
                input_data = json.loads(input)
1✔
919
            except Exception as ex:
×
920
                raise InvalidExecutionInput(str(ex))  # TODO: report parsing error like AWS.
×
921

922
        normalised_state_machine_arn = (
1✔
923
            state_machine_clone.source_arn
924
            if isinstance(state_machine_clone, StateMachineVersion)
925
            else state_machine_clone.arn
926
        )
927
        exec_name = name or long_uid()  # TODO: validate name format
1✔
928
        exec_arn = stepfunctions_express_execution_arn(normalised_state_machine_arn, exec_name)
1✔
929

930
        if exec_arn in self.get_store(context).executions:
1✔
931
            raise InvalidName()  # TODO
×
932

933
        # Create the execution logging session, if logging is configured.
934
        cloud_watch_logging_session = None
1✔
935
        if state_machine_clone.cloud_watch_logging_configuration is not None:
1✔
936
            cloud_watch_logging_session = CloudWatchLoggingSession(
×
937
                execution_arn=exec_arn,
938
                configuration=state_machine_clone.cloud_watch_logging_configuration,
939
            )
940

941
        local_mock_test_case = self._get_local_mock_test_case(
1✔
942
            state_machine_arn, state_machine_clone.name
943
        )
944

945
        execution = SyncExecution(
1✔
946
            name=exec_name,
947
            sm_type=state_machine_clone.sm_type,
948
            role_arn=state_machine_clone.role_arn,
949
            exec_arn=exec_arn,
950
            account_id=context.account_id,
951
            region_name=context.region,
952
            state_machine=state_machine_clone,
953
            start_date=datetime.datetime.now(tz=datetime.UTC),
954
            cloud_watch_logging_session=cloud_watch_logging_session,
955
            input_data=input_data,
956
            trace_header=trace_header,
957
            activity_store=self.get_store(context).activities,
958
            local_mock_test_case=local_mock_test_case,
959
        )
960
        self.get_store(context).executions[exec_arn] = execution
1✔
961

962
        execution.start()
1✔
963
        return execution.to_start_sync_execution_output()
1✔
964

965
    def describe_execution(
1✔
966
        self,
967
        context: RequestContext,
968
        execution_arn: Arn,
969
        included_data: IncludedData = None,
970
        **kwargs,
971
    ) -> DescribeExecutionOutput:
972
        self._validate_state_machine_execution_arn(execution_arn)
1✔
973
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
974

975
        # Action only compatible with STANDARD workflows.
976
        if execution.sm_type != StateMachineType.STANDARD:
1✔
977
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
978

979
        return execution.to_describe_output()
1✔
980

981
    @staticmethod
1✔
982
    def _list_execution_filter(
1✔
983
        ex: Execution, state_machine_arn: str, status_filter: str | None
984
    ) -> bool:
985
        state_machine_reference_arn_set = {ex.state_machine_arn, ex.state_machine_version_arn}
1✔
986
        if state_machine_arn not in state_machine_reference_arn_set:
1✔
987
            return False
1✔
988

989
        if not status_filter:
1✔
990
            return True
1✔
991
        return ex.exec_status == status_filter
1✔
992

993
    def list_executions(
1✔
994
        self,
995
        context: RequestContext,
996
        state_machine_arn: Arn = None,
997
        status_filter: ExecutionStatus = None,
998
        max_results: PageSize = None,
999
        next_token: ListExecutionsPageToken = None,
1000
        map_run_arn: LongArn = None,
1001
        redrive_filter: ExecutionRedriveFilter = None,
1002
        **kwargs,
1003
    ) -> ListExecutionsOutput:
1004
        self._validate_state_machine_arn(state_machine_arn)
1✔
1005
        assert_pagination_parameters_valid(
1✔
1006
            max_results=max_results,
1007
            next_token=next_token,
1008
            next_token_length_limit=3096,
1009
        )
1010
        max_results = normalise_max_results(max_results)
1✔
1011

1012
        state_machine = self.get_store(context).state_machines.get(state_machine_arn)
1✔
1013
        if state_machine is None:
1✔
1014
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1015

1016
        if state_machine.sm_type != StateMachineType.STANDARD:
1✔
1017
            self._raise_state_machine_type_not_supported()
1✔
1018

1019
        # TODO: add support for paging
1020

1021
        allowed_execution_status = [
1✔
1022
            ExecutionStatus.SUCCEEDED,
1023
            ExecutionStatus.TIMED_OUT,
1024
            ExecutionStatus.PENDING_REDRIVE,
1025
            ExecutionStatus.ABORTED,
1026
            ExecutionStatus.FAILED,
1027
            ExecutionStatus.RUNNING,
1028
        ]
1029

1030
        validation_errors = []
1✔
1031

1032
        if status_filter and status_filter not in allowed_execution_status:
1✔
1033
            validation_errors.append(
1✔
1034
                f"Value '{status_filter}' at 'statusFilter' failed to satisfy constraint: Member must satisfy enum value set: [{', '.join(allowed_execution_status)}]"
1035
            )
1036

1037
        if not state_machine_arn and not map_run_arn:
1✔
1038
            validation_errors.append("Must provide a StateMachine ARN or MapRun ARN")
×
1039

1040
        if validation_errors:
1✔
1041
            errors_message = "; ".join(validation_errors)
1✔
1042
            message = f"{len(validation_errors)} validation {'errors' if len(validation_errors) > 1 else 'error'} detected: {errors_message}"
1✔
1043
            raise CommonServiceException(message=message, code="ValidationException")
1✔
1044

1045
        executions: ExecutionList = [
1✔
1046
            execution.to_execution_list_item()
1047
            for execution in self.get_store(context).executions.values()
1048
            if self._list_execution_filter(
1049
                execution,
1050
                state_machine_arn=state_machine_arn,
1051
                status_filter=status_filter,
1052
            )
1053
        ]
1054

1055
        executions.sort(key=lambda item: item["startDate"], reverse=True)
1✔
1056

1057
        paginated_executions = PaginatedList(executions)
1✔
1058
        page, token_for_next_page = paginated_executions.get_page(
1✔
1059
            token_generator=lambda item: get_next_page_token_from_arn(item.get("executionArn")),
1060
            page_size=max_results,
1061
            next_token=next_token,
1062
        )
1063

1064
        return ListExecutionsOutput(executions=page, nextToken=token_for_next_page)
1✔
1065

1066
    def list_state_machines(
1✔
1067
        self,
1068
        context: RequestContext,
1069
        max_results: PageSize = None,
1070
        next_token: PageToken = None,
1071
        **kwargs,
1072
    ) -> ListStateMachinesOutput:
1073
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1074
        max_results = normalise_max_results(max_results)
1✔
1075

1076
        state_machines: StateMachineList = [
1✔
1077
            sm.itemise()
1078
            for sm in self.get_store(context).state_machines.values()
1079
            if isinstance(sm, StateMachineRevision)
1080
        ]
1081
        state_machines.sort(key=lambda item: item["name"])
1✔
1082

1083
        paginated_state_machines = PaginatedList(state_machines)
1✔
1084
        page, token_for_next_page = paginated_state_machines.get_page(
1✔
1085
            token_generator=lambda item: get_next_page_token_from_arn(item.get("stateMachineArn")),
1086
            page_size=max_results,
1087
            next_token=next_token,
1088
        )
1089

1090
        return ListStateMachinesOutput(stateMachines=page, nextToken=token_for_next_page)
1✔
1091

1092
    def list_state_machine_aliases(
1✔
1093
        self,
1094
        context: RequestContext,
1095
        state_machine_arn: Arn,
1096
        next_token: PageToken = None,
1097
        max_results: PageSize = None,
1098
        **kwargs,
1099
    ) -> ListStateMachineAliasesOutput:
1100
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1101

1102
        self._validate_state_machine_arn(state_machine_arn)
1✔
1103
        state_machines = self.get_store(context).state_machines
1✔
1104
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1105
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
1106
            raise InvalidArn(f"Invalid arn: {state_machine_arn}")
×
1107

1108
        state_machine_aliases: StateMachineAliasList = []
1✔
1109
        valid_token_found = next_token is None
1✔
1110

1111
        for alias in state_machine_revision.aliases:
1✔
1112
            state_machine_aliases.append(alias.to_item())
1✔
1113
            if alias.tokenized_state_machine_alias_arn == next_token:
1✔
1114
                valid_token_found = True
1✔
1115

1116
        if not valid_token_found:
1✔
1117
            raise InvalidToken("Invalid Token: 'Invalid token'")
1✔
1118

1119
        state_machine_aliases.sort(key=lambda item: item["creationDate"])
1✔
1120

1121
        paginated_list = PaginatedList(state_machine_aliases)
1✔
1122

1123
        paginated_aliases, next_token = paginated_list.get_page(
1✔
1124
            token_generator=lambda item: get_next_page_token_from_arn(
1125
                item.get("stateMachineAliasArn")
1126
            ),
1127
            next_token=next_token,
1128
            page_size=100 if max_results == 0 or max_results is None else max_results,
1129
        )
1130

1131
        return ListStateMachineAliasesOutput(
1✔
1132
            stateMachineAliases=paginated_aliases, nextToken=next_token
1133
        )
1134

1135
    def list_state_machine_versions(
1✔
1136
        self,
1137
        context: RequestContext,
1138
        state_machine_arn: Arn,
1139
        next_token: PageToken = None,
1140
        max_results: PageSize = None,
1141
        **kwargs,
1142
    ) -> ListStateMachineVersionsOutput:
1143
        self._validate_state_machine_arn(state_machine_arn)
1✔
1144
        assert_pagination_parameters_valid(max_results, next_token)
1✔
1145
        max_results = normalise_max_results(max_results)
1✔
1146

1147
        state_machines = self.get_store(context).state_machines
1✔
1148
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1149
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
1150
            raise InvalidArn(f"Invalid arn: {state_machine_arn}")
×
1151

1152
        state_machine_version_items = []
1✔
1153
        for version_arn in state_machine_revision.versions.values():
1✔
1154
            state_machine_version = state_machines[version_arn]
1✔
1155
            if isinstance(state_machine_version, StateMachineVersion):
1✔
1156
                state_machine_version_items.append(state_machine_version.itemise())
1✔
1157
            else:
1158
                raise RuntimeError(
×
1159
                    f"Expected {version_arn} to be a StateMachine Version, but got '{type(state_machine_version)}'."
1160
                )
1161

1162
        state_machine_version_items.sort(key=lambda item: item["creationDate"], reverse=True)
1✔
1163

1164
        paginated_state_machine_versions = PaginatedList(state_machine_version_items)
1✔
1165
        page, token_for_next_page = paginated_state_machine_versions.get_page(
1✔
1166
            token_generator=lambda item: get_next_page_token_from_arn(
1167
                item.get("stateMachineVersionArn")
1168
            ),
1169
            page_size=max_results,
1170
            next_token=next_token,
1171
        )
1172

1173
        return ListStateMachineVersionsOutput(
1✔
1174
            stateMachineVersions=page, nextToken=token_for_next_page
1175
        )
1176

1177
    def get_execution_history(
1✔
1178
        self,
1179
        context: RequestContext,
1180
        execution_arn: Arn,
1181
        max_results: PageSize = None,
1182
        reverse_order: ReverseOrder = None,
1183
        next_token: PageToken = None,
1184
        include_execution_data: IncludeExecutionDataGetExecutionHistory = None,
1185
        **kwargs,
1186
    ) -> GetExecutionHistoryOutput:
1187
        # TODO: add support for paging, ordering, and other manipulations.
1188
        self._validate_state_machine_execution_arn(execution_arn)
1✔
1189
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1190

1191
        # Action only compatible with STANDARD workflows.
1192
        if execution.sm_type != StateMachineType.STANDARD:
1✔
1193
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
1194

1195
        history: GetExecutionHistoryOutput = execution.to_history_output()
1✔
1196
        if reverse_order:
1✔
1197
            history["events"].reverse()
1✔
1198
        return history
1✔
1199

1200
    def delete_state_machine(
1✔
1201
        self, context: RequestContext, state_machine_arn: Arn, **kwargs
1202
    ) -> DeleteStateMachineOutput:
1203
        # TODO: halt executions?
1204
        self._validate_state_machine_arn(state_machine_arn)
1✔
1205
        state_machines = self.get_store(context).state_machines
1✔
1206
        state_machine = state_machines.get(state_machine_arn)
1✔
1207
        if isinstance(state_machine, StateMachineRevision):
1✔
1208
            state_machines.pop(state_machine_arn)
1✔
1209
            for version_arn in state_machine.versions.values():
1✔
1210
                state_machines.pop(version_arn, None)
1✔
1211
        return DeleteStateMachineOutput()
1✔
1212

1213
    def delete_state_machine_alias(
1✔
1214
        self, context: RequestContext, state_machine_alias_arn: Arn, **kwargs
1215
    ) -> DeleteStateMachineAliasOutput:
1216
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
1217
        store = self.get_store(context=context)
1✔
1218
        aliases = store.aliases
1✔
1219
        if (alias := aliases.pop(state_machine_alias_arn, None)) is not None:
1✔
1220
            state_machines = store.state_machines
1✔
1221
            for routing_configuration in alias.get_routing_configuration_list():
1✔
1222
                state_machine_version_arn = routing_configuration["stateMachineVersionArn"]
1✔
1223
                if (
1✔
1224
                    state_machine_version := state_machines.get(state_machine_version_arn)
1225
                ) is None or not isinstance(state_machine_version, StateMachineVersion):
1226
                    continue
1✔
1227
                if (
1✔
1228
                    state_machine_revision := state_machines.get(state_machine_version.source_arn)
1229
                ) is None or not isinstance(state_machine_revision, StateMachineRevision):
1230
                    continue
×
1231
                state_machine_revision.aliases.discard(alias)
1✔
1232
        return DeleteStateMachineOutput()
1✔
1233

1234
    def delete_state_machine_version(
1✔
1235
        self, context: RequestContext, state_machine_version_arn: LongArn, **kwargs
1236
    ) -> DeleteStateMachineVersionOutput:
1237
        self._validate_state_machine_arn(state_machine_version_arn)
1✔
1238
        state_machines = self.get_store(context).state_machines
1✔
1239

1240
        if not (
1✔
1241
            state_machine_version := state_machines.get(state_machine_version_arn)
1242
        ) or not isinstance(state_machine_version, StateMachineVersion):
1243
            return DeleteStateMachineVersionOutput()
1✔
1244

1245
        if (
1✔
1246
            state_machine_revision := state_machines.get(state_machine_version.source_arn)
1247
        ) and isinstance(state_machine_revision, StateMachineRevision):
1248
            referencing_alias_names: list[str] = []
1✔
1249
            for alias in state_machine_revision.aliases:
1✔
1250
                if alias.is_router_for(state_machine_version_arn=state_machine_version_arn):
1✔
1251
                    referencing_alias_names.append(alias.name)
1✔
1252
            if referencing_alias_names:
1✔
1253
                referencing_alias_names_list_body = ", ".join(referencing_alias_names)
1✔
1254
                raise ConflictException(
1✔
1255
                    "Version to be deleted must not be referenced by an alias. "
1256
                    f"Current list of aliases referencing this version: [{referencing_alias_names_list_body}]"
1257
                )
1258
            state_machine_revision.delete_version(state_machine_version_arn)
1✔
1259

1260
        state_machines.pop(state_machine_version.arn, None)
1✔
1261
        return DeleteStateMachineVersionOutput()
1✔
1262

1263
    def stop_execution(
1✔
1264
        self,
1265
        context: RequestContext,
1266
        execution_arn: Arn,
1267
        error: SensitiveError = None,
1268
        cause: SensitiveCause = None,
1269
        **kwargs,
1270
    ) -> StopExecutionOutput:
1271
        self._validate_state_machine_execution_arn(execution_arn)
1✔
1272
        execution: Execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1273

1274
        # Action only compatible with STANDARD workflows.
1275
        if execution.sm_type != StateMachineType.STANDARD:
1✔
1276
            self._raise_resource_type_not_in_context(resource_type=execution.sm_type)
1✔
1277

1278
        stop_date = datetime.datetime.now(tz=datetime.UTC)
1✔
1279
        execution.stop(stop_date=stop_date, cause=cause, error=error)
1✔
1280
        return StopExecutionOutput(stopDate=stop_date)
1✔
1281

1282
    def update_state_machine(
1✔
1283
        self,
1284
        context: RequestContext,
1285
        state_machine_arn: Arn,
1286
        definition: Definition = None,
1287
        role_arn: Arn = None,
1288
        logging_configuration: LoggingConfiguration = None,
1289
        tracing_configuration: TracingConfiguration = None,
1290
        publish: Publish = None,
1291
        version_description: VersionDescription = None,
1292
        encryption_configuration: EncryptionConfiguration = None,
1293
        **kwargs,
1294
    ) -> UpdateStateMachineOutput:
1295
        self._validate_state_machine_arn(state_machine_arn)
1✔
1296
        state_machines = self.get_store(context).state_machines
1✔
1297

1298
        state_machine = state_machines.get(state_machine_arn)
1✔
1299
        if not isinstance(state_machine, StateMachineRevision):
1✔
1300
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1301

1302
        # TODO: Add logic to handle metrics for when SFN definitions update
1303
        if not any([definition, role_arn, logging_configuration]):
1✔
1304
            raise MissingRequiredParameter(
1✔
1305
                "Either the definition, the role ARN, the LoggingConfiguration, "
1306
                "or the TracingConfiguration must be specified"
1307
            )
1308

1309
        if definition is not None:
1✔
1310
            self._validate_definition(definition=definition, static_analysers=[StaticAnalyser()])
1✔
1311

1312
        if logging_configuration is not None:
1✔
1313
            self._sanitise_logging_configuration(logging_configuration=logging_configuration)
1✔
1314

1315
        revision_id = state_machine.create_revision(
1✔
1316
            definition=definition,
1317
            role_arn=role_arn,
1318
            logging_configuration=logging_configuration,
1319
        )
1320

1321
        version_arn = None
1✔
1322
        if publish:
1✔
1323
            version = state_machine.create_version(description=version_description)
1✔
1324
            if version is not None:
1✔
1325
                version_arn = version.arn
1✔
1326
                state_machines[version_arn] = version
1✔
1327
            else:
1328
                target_revision_id = revision_id or state_machine.revision_id
1✔
1329
                version_arn = state_machine.versions[target_revision_id]
1✔
1330

1331
        update_output = UpdateStateMachineOutput(updateDate=datetime.datetime.now(tz=datetime.UTC))
1✔
1332
        if revision_id is not None:
1✔
1333
            update_output["revisionId"] = revision_id
1✔
1334
        if version_arn is not None:
1✔
1335
            update_output["stateMachineVersionArn"] = version_arn
1✔
1336
        return update_output
1✔
1337

1338
    def update_state_machine_alias(
1✔
1339
        self,
1340
        context: RequestContext,
1341
        state_machine_alias_arn: Arn,
1342
        description: AliasDescription = None,
1343
        routing_configuration: RoutingConfigurationList = None,
1344
        **kwargs,
1345
    ) -> UpdateStateMachineAliasOutput:
1346
        self._validate_state_machine_alias_arn(state_machine_alias_arn=state_machine_alias_arn)
1✔
1347
        if not any([description, routing_configuration]):
1✔
1348
            raise MissingRequiredParameter(
×
1349
                "Either the description or the RoutingConfiguration must be specified"
1350
            )
1351
        if routing_configuration is not None:
1✔
1352
            self._validate_state_machine_alias_routing_configuration(
1✔
1353
                context=context, routing_configuration_list=routing_configuration
1354
            )
1355
        store = self.get_store(context=context)
1✔
1356
        alias = store.aliases.get(state_machine_alias_arn)
1✔
1357
        if alias is None:
1✔
1358
            raise ResourceNotFound("Request references a resource that does not exist.")
1✔
1359

1360
        alias.update(description=description, routing_configuration_list=routing_configuration)
1✔
1361
        return UpdateStateMachineAliasOutput(updateDate=alias.update_date)
1✔
1362

1363
    def publish_state_machine_version(
1✔
1364
        self,
1365
        context: RequestContext,
1366
        state_machine_arn: Arn,
1367
        revision_id: RevisionId = None,
1368
        description: VersionDescription = None,
1369
        **kwargs,
1370
    ) -> PublishStateMachineVersionOutput:
1371
        self._validate_state_machine_arn(state_machine_arn)
1✔
1372
        state_machines = self.get_store(context).state_machines
1✔
1373

1374
        state_machine_revision = state_machines.get(state_machine_arn)
1✔
1375
        if not isinstance(state_machine_revision, StateMachineRevision):
1✔
1376
            self._raise_state_machine_does_not_exist(state_machine_arn)
1✔
1377

1378
        if revision_id is not None and state_machine_revision.revision_id != revision_id:
1✔
1379
            raise ConflictException(
1✔
1380
                f"Failed to publish the State Machine version for revision {revision_id}. "
1381
                f"The current State Machine revision is {state_machine_revision.revision_id}."
1382
            )
1383

1384
        state_machine_version = state_machine_revision.create_version(description=description)
1✔
1385
        if state_machine_version is not None:
1✔
1386
            state_machines[state_machine_version.arn] = state_machine_version
1✔
1387
        else:
1388
            target_revision_id = revision_id or state_machine_revision.revision_id
1✔
1389
            state_machine_version_arn = state_machine_revision.versions.get(target_revision_id)
1✔
1390
            state_machine_version = state_machines[state_machine_version_arn]
1✔
1391

1392
        return PublishStateMachineVersionOutput(
1✔
1393
            creationDate=state_machine_version.create_date,
1394
            stateMachineVersionArn=state_machine_version.arn,
1395
        )
1396

1397
    def tag_resource(
1✔
1398
        self, context: RequestContext, resource_arn: Arn, tags: TagList, **kwargs
1399
    ) -> TagResourceOutput:
1400
        # TODO: add tagging for activities.
1401
        state_machines = self.get_store(context).state_machines
1✔
1402
        state_machine = state_machines.get(resource_arn)
1✔
1403
        if not isinstance(state_machine, StateMachineRevision):
1✔
1404
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
1✔
1405

1406
        state_machine.tag_manager.add_all(tags)
1✔
1407
        return TagResourceOutput()
1✔
1408

1409
    def untag_resource(
1✔
1410
        self, context: RequestContext, resource_arn: Arn, tag_keys: TagKeyList, **kwargs
1411
    ) -> UntagResourceOutput:
1412
        # TODO: add untagging for activities.
1413
        state_machines = self.get_store(context).state_machines
1✔
1414
        state_machine = state_machines.get(resource_arn)
1✔
1415
        if not isinstance(state_machine, StateMachineRevision):
1✔
1416
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
×
1417

1418
        state_machine.tag_manager.remove_all(tag_keys)
1✔
1419
        return UntagResourceOutput()
1✔
1420

1421
    def list_tags_for_resource(
1✔
1422
        self, context: RequestContext, resource_arn: Arn, **kwargs
1423
    ) -> ListTagsForResourceOutput:
1424
        # TODO: add untagging for activities.
1425
        state_machines = self.get_store(context).state_machines
1✔
1426
        state_machine = state_machines.get(resource_arn)
1✔
1427
        if not isinstance(state_machine, StateMachineRevision):
1✔
1428
            raise ResourceNotFound(f"Resource not found: '{resource_arn}'")
×
1429

1430
        tags: TagList = state_machine.tag_manager.to_tag_list()
1✔
1431
        return ListTagsForResourceOutput(tags=tags)
1✔
1432

1433
    def describe_map_run(
1✔
1434
        self, context: RequestContext, map_run_arn: LongArn, **kwargs
1435
    ) -> DescribeMapRunOutput:
1436
        store = self.get_store(context)
1✔
1437
        for execution in store.executions.values():
1✔
1438
            map_run_record: MapRunRecord | None = (
1✔
1439
                execution.exec_worker.env.map_run_record_pool_manager.get(map_run_arn)
1440
            )
1441
            if map_run_record is not None:
1✔
1442
                return map_run_record.describe()
1✔
1443
        raise ResourceNotFound()
×
1444

1445
    def list_map_runs(
1✔
1446
        self,
1447
        context: RequestContext,
1448
        execution_arn: Arn,
1449
        max_results: PageSize = None,
1450
        next_token: PageToken = None,
1451
        **kwargs,
1452
    ) -> ListMapRunsOutput:
1453
        # TODO: add support for paging.
1454
        execution = self._get_execution(context=context, execution_arn=execution_arn)
1✔
1455
        map_run_records: list[MapRunRecord] = (
1✔
1456
            execution.exec_worker.env.map_run_record_pool_manager.get_all()
1457
        )
1458
        return ListMapRunsOutput(
1✔
1459
            mapRuns=[map_run_record.list_item() for map_run_record in map_run_records]
1460
        )
1461

1462
    def update_map_run(
1✔
1463
        self,
1464
        context: RequestContext,
1465
        map_run_arn: LongArn,
1466
        max_concurrency: MaxConcurrency = None,
1467
        tolerated_failure_percentage: ToleratedFailurePercentage = None,
1468
        tolerated_failure_count: ToleratedFailureCount = None,
1469
        **kwargs,
1470
    ) -> UpdateMapRunOutput:
1471
        if tolerated_failure_percentage is not None or tolerated_failure_count is not None:
×
1472
            raise NotImplementedError(
1473
                "Updating of ToleratedFailureCount and ToleratedFailurePercentage is currently unsupported."
1474
            )
1475
        # TODO: investigate behaviour of empty requests.
1476
        store = self.get_store(context)
×
1477
        for execution in store.executions.values():
×
1478
            map_run_record: MapRunRecord | None = (
×
1479
                execution.exec_worker.env.map_run_record_pool_manager.get(map_run_arn)
1480
            )
1481
            if map_run_record is not None:
×
1482
                map_run_record.update(
×
1483
                    max_concurrency=max_concurrency,
1484
                    tolerated_failure_count=tolerated_failure_count,
1485
                    tolerated_failure_percentage=tolerated_failure_percentage,
1486
                )
1487
                LOG.warning(
×
1488
                    "StepFunctions UpdateMapRun changes are currently not being reflected in the MapRun instances."
1489
                )
1490
                return UpdateMapRunOutput()
×
1491
        raise ResourceNotFound()
×
1492

1493
    def test_state(
1✔
1494
        self, context: RequestContext, request: TestStateInput, **kwargs
1495
    ) -> TestStateOutput:
1496
        state_name = request.get("stateName")
1✔
1497
        definition = request["definition"]
1✔
1498

1499
        StepFunctionsProvider._validate_definition(
1✔
1500
            definition=definition,
1501
            static_analysers=[TestStateStaticAnalyser(state_name)],
1502
        )
1503

1504
        # if StateName is present, we need to ensure the state being referenced exists in full definition.
1505
        if state_name and not TestStateStaticAnalyser.is_state_in_definition(
1✔
1506
            definition=definition, state_name=state_name
1507
        ):
1508
            raise ValidationException("State not found in definition")
1✔
1509

1510
        mock_input = request.get("mock")
1✔
1511
        state_configuration = request.get("stateConfiguration")
1✔
1512

1513
        TestStateStaticAnalyser.validate_state_configuration(state_configuration, mock_input)
1✔
1514
        TestStateStaticAnalyser.validate_mock(test_state_input=request)
1✔
1515

1516
        if state_context := request.get("context"):
1✔
1517
            # TODO: Add validation ensuring only present if 'mock' is specified
1518
            # An error occurred (ValidationException) when calling the TestState operation: State type 'Pass' is not supported when a mock is specified
1519
            pass
1✔
1520

1521
        try:
1✔
1522
            state_mock = TestStateMock(
1✔
1523
                mock_input=mock_input,
1524
                state_configuration=state_configuration,
1525
                context=state_context,
1526
            )
1527
        except ValueError as e:
1✔
1528
            LOG.error(e)
1✔
1529
            raise ValidationException(f"Invalid Context object provided: {e}")
1✔
1530

1531
        name: Name | None = f"TestState-{short_uid()}"
1✔
1532
        arn = stepfunctions_state_machine_arn(
1✔
1533
            name=name, account_id=context.account_id, region_name=context.region
1534
        )
1535
        role_arn = request.get("roleArn")
1✔
1536
        if role_arn is None:
1✔
1537
            TestStateStaticAnalyser.validate_role_arn_required(
1✔
1538
                mock_input=mock_input, definition=definition, state_name=state_name
1539
            )
1540
            # HACK: Added dummy role ARN because it is a required field in Execution.
1541
            # To allow optional roleArn for the test state but preserve the mandatory one for regular executions
1542
            # we likely need to remove inheritance TestStateExecution(Execution) in favor of composition.
1543
            # TestState execution starts to have too many simplifications compared to a regular execution
1544
            # which renders the inheritance mechanism harmful.
1545
            # TODO make role_arn optional in TestStateExecution
1546
            role_arn = arns.iam_role_arn(
1✔
1547
                role_name=f"RoleFor-{name}",
1548
                account_id=context.account_id,
1549
                region_name=context.region,
1550
            )
1551

1552
        state_machine = TestStateMachine(
1✔
1553
            name=name,
1554
            arn=arn,
1555
            role_arn=role_arn,
1556
            definition=request["definition"],
1557
        )
1558

1559
        # HACK(gregfurman): The ARN that gets generated has a duplicate 'name' field in the
1560
        # resource ARN. Just replace this duplication and extract the execution ID.
1561
        exec_arn = stepfunctions_express_execution_arn(state_machine.arn, name)
1✔
1562
        exec_arn = exec_arn.replace(f":{name}:{name}:", f":{name}:", 1)
1✔
1563
        _, exec_name = exec_arn.rsplit(":", 1)
1✔
1564

1565
        if input_json := request.get("input", {}):
1✔
1566
            input_json = json.loads(input_json)
1✔
1567

1568
        if variables_json := request.get("variables"):
1✔
1569
            variables_json = json.loads(variables_json)
1✔
1570

1571
        execution = TestStateExecution(
1✔
1572
            name=exec_name,
1573
            role_arn=role_arn,
1574
            exec_arn=exec_arn,
1575
            account_id=context.account_id,
1576
            region_name=context.region,
1577
            state_machine=state_machine,
1578
            start_date=datetime.datetime.now(tz=datetime.UTC),
1579
            input_data=input_json,
1580
            state_name=state_name,
1581
            activity_store=self.get_store(context).activities,
1582
            mock=state_mock,
1583
            variables=variables_json,
1584
        )
1585
        execution.start()
1✔
1586

1587
        test_state_output = execution.to_test_state_output(
1✔
1588
            inspection_level=request.get("inspectionLevel", InspectionLevel.INFO)
1589
        )
1590

1591
        return test_state_output
1✔
1592

1593
    def create_activity(
1✔
1594
        self,
1595
        context: RequestContext,
1596
        name: Name,
1597
        tags: TagList = None,
1598
        encryption_configuration: EncryptionConfiguration = None,
1599
        **kwargs,
1600
    ) -> CreateActivityOutput:
1601
        self._validate_activity_name(name=name)
1✔
1602

1603
        activity_arn = stepfunctions_activity_arn(
1✔
1604
            name=name, account_id=context.account_id, region_name=context.region
1605
        )
1606
        activities = self.get_store(context).activities
1✔
1607
        if activity_arn not in activities:
1✔
1608
            activity = Activity(arn=activity_arn, name=name)
1✔
1609
            activities[activity_arn] = activity
1✔
1610
        else:
1611
            activity = activities[activity_arn]
1✔
1612

1613
        return CreateActivityOutput(activityArn=activity.arn, creationDate=activity.creation_date)
1✔
1614

1615
    def delete_activity(
1✔
1616
        self, context: RequestContext, activity_arn: Arn, **kwargs
1617
    ) -> DeleteActivityOutput:
1618
        self._validate_activity_arn(activity_arn)
1✔
1619
        self.get_store(context).activities.pop(activity_arn, None)
1✔
1620
        return DeleteActivityOutput()
1✔
1621

1622
    def describe_activity(
1✔
1623
        self, context: RequestContext, activity_arn: Arn, **kwargs
1624
    ) -> DescribeActivityOutput:
1625
        self._validate_activity_arn(activity_arn)
1✔
1626
        activity = self._get_activity(context=context, activity_arn=activity_arn)
1✔
1627
        return activity.to_describe_activity_output()
1✔
1628

1629
    def list_activities(
1✔
1630
        self,
1631
        context: RequestContext,
1632
        max_results: PageSize = None,
1633
        next_token: PageToken = None,
1634
        **kwargs,
1635
    ) -> ListActivitiesOutput:
1636
        activities: list[Activity] = list(self.get_store(context).activities.values())
1✔
1637
        return ListActivitiesOutput(
1✔
1638
            activities=[activity.to_activity_list_item() for activity in activities]
1639
        )
1640

1641
    def _send_activity_task_started(
1✔
1642
        self,
1643
        context: RequestContext,
1644
        task_token: TaskToken,
1645
        worker_name: Name | None,
1646
    ) -> None:
1647
        executions: list[Execution] = self._get_executions(context)
1✔
1648
        for execution in executions:
1✔
1649
            callback_endpoint = execution.exec_worker.env.callback_pool_manager.get(
1✔
1650
                callback_id=task_token
1651
            )
1652
            if isinstance(callback_endpoint, ActivityCallbackEndpoint):
1✔
1653
                callback_endpoint.notify_activity_task_start(worker_name=worker_name)
1✔
1654
                return
1✔
UNCOV
1655
        raise InvalidToken()
×
1656

1657
    @staticmethod
1✔
1658
    def _pull_activity_task(activity: Activity) -> ActivityTask | None:
1✔
1659
        seconds_left = 60
1✔
1660
        while seconds_left > 0:
1✔
1661
            try:
1✔
1662
                return activity.get_task()
1✔
1663
            except IndexError:
1✔
1664
                time.sleep(1)
1✔
1665
                seconds_left -= 1
1✔
UNCOV
1666
        return None
×
1667

1668
    def get_activity_task(
1✔
1669
        self,
1670
        context: RequestContext,
1671
        activity_arn: Arn,
1672
        worker_name: Name = None,
1673
        **kwargs,
1674
    ) -> GetActivityTaskOutput:
1675
        self._validate_activity_arn(activity_arn)
1✔
1676

1677
        activity = self._get_activity(context=context, activity_arn=activity_arn)
1✔
1678
        maybe_task: ActivityTask | None = self._pull_activity_task(activity=activity)
1✔
1679
        if maybe_task is not None:
1✔
1680
            self._send_activity_task_started(
1✔
1681
                context, maybe_task.task_token, worker_name=worker_name
1682
            )
1683
            return GetActivityTaskOutput(
1✔
1684
                taskToken=maybe_task.task_token, input=maybe_task.task_input
1685
            )
1686

UNCOV
1687
        return GetActivityTaskOutput(taskToken=None, input=None)
×
1688

1689
    def validate_state_machine_definition(
1✔
1690
        self, context: RequestContext, request: ValidateStateMachineDefinitionInput, **kwargs
1691
    ) -> ValidateStateMachineDefinitionOutput:
1692
        # TODO: increase parity of static analysers, current implementation is an unblocker for this API action.
1693
        # TODO: add support for ValidateStateMachineDefinitionSeverity
1694
        # TODO: add support for ValidateStateMachineDefinitionMaxResult
1695

1696
        state_machine_type: StateMachineType = request.get("type", StateMachineType.STANDARD)
1✔
1697
        definition: str = request["definition"]
1✔
1698

1699
        static_analysers = []
1✔
1700
        if state_machine_type == StateMachineType.STANDARD:
1✔
1701
            static_analysers.append(StaticAnalyser())
1✔
1702
        else:
1703
            static_analysers.append(ExpressStaticAnalyser())
1✔
1704

1705
        diagnostics: ValidateStateMachineDefinitionDiagnosticList = []
1✔
1706
        try:
1✔
1707
            StepFunctionsProvider._validate_definition(
1✔
1708
                definition=definition, static_analysers=static_analysers
1709
            )
1710
            validation_result = ValidateStateMachineDefinitionResultCode.OK
1✔
1711
        except InvalidDefinition as invalid_definition:
1✔
1712
            validation_result = ValidateStateMachineDefinitionResultCode.FAIL
1✔
1713
            diagnostics.append(
1✔
1714
                ValidateStateMachineDefinitionDiagnostic(
1715
                    severity=ValidateStateMachineDefinitionSeverity.ERROR,
1716
                    code="SCHEMA_VALIDATION_FAILED",
1717
                    message=invalid_definition.message,
1718
                )
1719
            )
UNCOV
1720
        except Exception as ex:
×
1721
            validation_result = ValidateStateMachineDefinitionResultCode.FAIL
×
1722
            LOG.error("Unknown error during validation %s", ex)
×
1723

1724
        return ValidateStateMachineDefinitionOutput(
1✔
1725
            result=validation_result, diagnostics=diagnostics, truncated=False
1726
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc