• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20449761985

22 Dec 2025 09:22PM UTC coverage: 86.912% (-0.008%) from 86.92%
20449761985

push

github

web-flow
APIGW: improve store typing (#13552)

9 of 9 new or added lines in 1 file covered. (100.0%)

130 existing lines in 7 files now uncovered.

70016 of 80560 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.93
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from collections.abc import Callable
1✔
4
from typing import Final
1✔
5

6
from botocore.exceptions import ClientError
1✔
7
from jsonpath_ng.ext import parse
1✔
8
from localstack_snapshot.snapshots.transformer import (
1✔
9
    JsonpathTransformer,
10
    RegexTransformer,
11
    TransformContext,
12
)
13

14
from localstack import config
1✔
15
from localstack.aws.api.stepfunctions import (
1✔
16
    Arn,
17
    CloudWatchLogsLogGroup,
18
    CreateStateMachineOutput,
19
    Definition,
20
    ExecutionStatus,
21
    HistoryEventList,
22
    HistoryEventType,
23
    LogDestination,
24
    LoggingConfiguration,
25
    LogLevel,
26
    LongArn,
27
    StateMachineType,
28
)
29
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
30
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
31
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
32
from localstack.testing.aws.util import is_aws_cloud
1✔
33
from localstack.utils.strings import short_uid
1✔
34
from localstack.utils.sync import poll_condition
1✔
35

36
LOG = logging.getLogger(__name__)
1✔
37

38

39
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
40
# Running executions may emit logs after DeleteStateMachine API is called.
41
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
42
_SAMPLING_INTERVAL_SECONDS_AWS_CLOUD: Final[int] = 1
1✔
43
_SAMPLING_INTERVAL_SECONDS_LOCALSTACK: Final[float] = 0.2
1✔
44

45

46
def _get_sampling_interval_seconds() -> int | float:
1✔
47
    return (
1✔
48
        _SAMPLING_INTERVAL_SECONDS_AWS_CLOUD
49
        if is_aws_cloud()
50
        else _SAMPLING_INTERVAL_SECONDS_LOCALSTACK
51
    )
52

53

54
def await_no_state_machines_listed(stepfunctions_client):
1✔
55
    def _is_empty_state_machine_list():
×
56
        lst_resp = stepfunctions_client.list_state_machines()
×
57
        state_machines = lst_resp["stateMachines"]
×
58
        return not bool(state_machines)
×
59

60
    success = poll_condition(
×
61
        condition=_is_empty_state_machine_list,
62
        timeout=_DELETION_TIMEOUT_SECS,
63
        interval=_get_sampling_interval_seconds(),
64
    )
65
    if not success:
×
66
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
67

68

69
def _is_state_machine_alias_listed(
1✔
70
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
71
):
72
    list_state_machine_aliases_list = stepfunctions_client.list_state_machine_aliases(
1✔
73
        stateMachineArn=state_machine_arn
74
    )
75
    state_machine_aliases = list_state_machine_aliases_list["stateMachineAliases"]
1✔
76
    for state_machine_alias in state_machine_aliases:
1✔
77
        if state_machine_alias["stateMachineAliasArn"] == state_machine_alias_arn:
1✔
78
            return True
1✔
79
    return False
1✔
80

81

82
def await_state_machine_alias_is_created(
1✔
83
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
84
):
85
    success = poll_condition(
1✔
86
        condition=lambda: _is_state_machine_alias_listed(
87
            stepfunctions_client=stepfunctions_client,
88
            state_machine_arn=state_machine_arn,
89
            state_machine_alias_arn=state_machine_alias_arn,
90
        ),
91
        timeout=_DELETION_TIMEOUT_SECS,
92
        interval=_get_sampling_interval_seconds(),
93
    )
94
    if not success:
1✔
95
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
96

97

98
def await_state_machine_alias_is_deleted(
1✔
99
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
100
):
101
    success = poll_condition(
1✔
102
        condition=lambda: not _is_state_machine_alias_listed(
103
            stepfunctions_client=stepfunctions_client,
104
            state_machine_arn=state_machine_arn,
105
            state_machine_alias_arn=state_machine_alias_arn,
106
        ),
107
        timeout=_DELETION_TIMEOUT_SECS,
108
        interval=_get_sampling_interval_seconds(),
109
    )
110
    if not success:
1✔
111
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
112

113

114
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
115
    lst_resp = stepfunctions_client.list_state_machines()
1✔
116
    state_machines = lst_resp["stateMachines"]
1✔
117
    for state_machine in state_machines:
1✔
118
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
119
            return True
1✔
120
    return False
1✔
121

122

123
def _is_state_machine_version_listed(
1✔
124
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
125
) -> bool:
126
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
127
    versions = lst_resp["stateMachineVersions"]
1✔
128
    for version in versions:
1✔
129
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
130
            return True
1✔
131
    return False
1✔
132

133

134
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
135
    success = poll_condition(
1✔
136
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
137
        timeout=_DELETION_TIMEOUT_SECS,
138
        interval=_get_sampling_interval_seconds(),
139
    )
140
    if not success:
1✔
141
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
142

143

144
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
145
    success = poll_condition(
1✔
146
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
147
        timeout=_DELETION_TIMEOUT_SECS,
148
        interval=_get_sampling_interval_seconds(),
149
    )
150
    if not success:
1✔
151
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
152

153

154
def await_state_machine_version_not_listed(
1✔
155
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
156
):
157
    success = poll_condition(
1✔
158
        condition=lambda: not _is_state_machine_version_listed(
159
            stepfunctions_client, state_machine_arn, state_machine_version_arn
160
        ),
161
        timeout=_DELETION_TIMEOUT_SECS,
162
        interval=_get_sampling_interval_seconds(),
163
    )
164
    if not success:
1✔
165
        LOG.warning(
×
166
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
167
            state_machine_arn,
168
            state_machine_version_arn,
169
        )
170

171

172
def await_state_machine_version_listed(
1✔
173
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
174
):
175
    success = poll_condition(
1✔
176
        condition=lambda: _is_state_machine_version_listed(
177
            stepfunctions_client, state_machine_arn, state_machine_version_arn
178
        ),
179
        timeout=_DELETION_TIMEOUT_SECS,
180
        interval=_get_sampling_interval_seconds(),
181
    )
182
    if not success:
1✔
183
        LOG.warning(
×
184
            "Timed out whilst awaiting for version of %s to include '%s'.",
185
            state_machine_arn,
186
            state_machine_version_arn,
187
        )
188

189

190
def await_on_execution_events(
1✔
191
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
192
) -> HistoryEventList:
193
    events: HistoryEventList = []
1✔
194

195
    def _run_check():
1✔
196
        nonlocal events
197
        events.clear()
1✔
198
        try:
1✔
199
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
200
        except ClientError:
×
201
            return False
×
202
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
203
        res: bool = check_func(events)
1✔
204
        return res
1✔
205

206
    assert poll_condition(
1✔
207
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
208
    )
209
    return events
1✔
210

211

212
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
213
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
214
        if len(events) > 0:
1✔
215
            last_event = events[-1]
1✔
216
            return "executionSucceededEventDetails" in last_event
1✔
UNCOV
217
        return False
×
218

219
    return await_on_execution_events(
1✔
220
        stepfunctions_client=stepfunctions_client,
221
        execution_arn=execution_arn,
222
        check_func=_check_last_is_success,
223
    )
224

225

226
def await_list_execution_status(
1✔
227
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
228
):
229
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
230

231
    def _run_check():
1✔
232
        list_resp = stepfunctions_client.list_executions(
1✔
233
            stateMachineArn=state_machine_arn, statusFilter=status
234
        )
235
        for execution in list_resp.get("executions", []):
1✔
236
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
237
                continue
×
238
            return True
1✔
239
        return False
×
240

241
    success = poll_condition(
1✔
242
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
243
    )
244
    if not success:
1✔
245
        LOG.warning(
×
246
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
247
            status,
248
            execution_arn,
249
        )
250

251

252
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
253
    if len(events) > 0:
1✔
254
        last_event = events[-1]
1✔
255
        last_event_type = last_event.get("type")
1✔
256
        return last_event_type is None or last_event_type in {
1✔
257
            HistoryEventType.ExecutionFailed,
258
            HistoryEventType.ExecutionAborted,
259
            HistoryEventType.ExecutionTimedOut,
260
            HistoryEventType.ExecutionSucceeded,
261
        }
262
    return False
1✔
263

264

265
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
266
    return await_on_execution_events(
1✔
267
        stepfunctions_client=stepfunctions_client,
268
        execution_arn=execution_arn,
269
        check_func=_is_last_history_event_terminal,
270
    )
271

272

273
def await_execution_lists_terminated(
1✔
274
    stepfunctions_client, state_machine_arn: str, execution_arn: str
275
):
276
    def _check_last_is_terminal() -> bool:
1✔
277
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
278
        executions = list_output["executions"]
1✔
279
        for execution in executions:
1✔
280
            if execution["executionArn"] == execution_arn:
1✔
281
                return execution["status"] != ExecutionStatus.RUNNING
1✔
282
        return False
×
283

284
    success = poll_condition(
1✔
285
        condition=_check_last_is_terminal, timeout=120, interval=_get_sampling_interval_seconds()
286
    )
287
    if not success:
1✔
288
        LOG.warning(
×
289
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
290
            execution_arn,
291
        )
292

293

294
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
295
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
296
        for event in events:
1✔
297
            return "executionStartedEventDetails" in event
1✔
298
        return False
×
299

300
    return await_on_execution_events(
1✔
301
        stepfunctions_client=stepfunctions_client,
302
        execution_arn=execution_arn,
303
        check_func=_check_stated_exists,
304
    )
305

306

307
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
308
    def _run_check():
1✔
309
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
310
        status: ExecutionStatus = desc_res["status"]
1✔
311
        return status == ExecutionStatus.ABORTED
1✔
312

313
    success = poll_condition(
1✔
314
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
315
    )
316
    if not success:
1✔
317
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
318

319

320
def get_expected_execution_logs(
1✔
321
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
322
) -> HistoryEventList:
323
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
324
    execution_history_events = execution_history["events"]
1✔
325
    expected_events = [
1✔
326
        event
327
        for event in execution_history_events
328
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
329
    ]
330
    return expected_events
1✔
331

332

333
def is_execution_logs_list_complete(
1✔
334
    expected_events: HistoryEventList,
335
) -> Callable[[HistoryEventList], bool]:
336
    def _validation_function(log_events: list) -> bool:
1✔
337
        if not expected_events:
1✔
338
            return True
×
339
        return len(expected_events) == len(log_events)
1✔
340

341
    return _validation_function
1✔
342

343

344
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
345
    logs_client = target_aws_client.logs
1✔
346
    log_stream_name = ""
1✔
347

348
    def _run_check():
1✔
349
        nonlocal log_stream_name
350
        try:
1✔
351
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
352
                "logStreams"
353
            ]
354
            if not log_streams:
1✔
355
                return False
×
356

357
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
358
            if (
1✔
359
                log_stream_name
360
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
361
            ):
362
                # SFN has not yet create the log stream for the execution, only the validation steam.
363
                return False
1✔
364
            return True
1✔
365
        except ClientError:
×
366
            return False
×
367

368
    assert poll_condition(condition=_run_check)
1✔
369
    return log_stream_name
1✔
370

371

372
def await_on_execution_logs(
1✔
373
    target_aws_client,
374
    log_group_name: str,
375
    validation_function: Callable[[HistoryEventList], bool] = None,
376
) -> HistoryEventList:
377
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
378

379
    logs_client = target_aws_client.logs
1✔
380
    events: HistoryEventList = []
1✔
381

382
    def _run_check():
1✔
383
        nonlocal events
384
        events.clear()
1✔
385
        try:
1✔
386
            log_events = logs_client.get_log_events(
1✔
387
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
388
            )["events"]
389
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
390
        except ClientError:
×
391
            return False
×
392

393
        res = validation_function(events)
1✔
394
        return res
1✔
395

396
    assert poll_condition(condition=_run_check)
1✔
397
    return events
1✔
398

399

400
def create_state_machine_with_iam_role(
1✔
401
    target_aws_client,
402
    create_state_machine_iam_role,
403
    create_state_machine,
404
    snapshot,
405
    definition: Definition,
406
    logging_configuration: LoggingConfiguration | None = None,
407
    state_machine_name: str | None = None,
408
    state_machine_type: StateMachineType = StateMachineType.STANDARD,
409
):
410
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
411
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
412
    snapshot.add_transformer(
1✔
413
        RegexTransformer(
414
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
415
            "Extended Request ID: <extended_request_id>",
416
        )
417
    )
418
    snapshot.add_transformer(
1✔
419
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
420
    )
421

422
    sm_name: str = state_machine_name or f"statemachine_create_and_record_execution_{short_uid()}"
1✔
423
    create_arguments = {
1✔
424
        "name": sm_name,
425
        "definition": definition,
426
        "roleArn": snf_role_arn,
427
        "type": state_machine_type,
428
    }
429
    if logging_configuration is not None:
1✔
430
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
431
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
432
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
433
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
434
    return state_machine_arn
1✔
435

436

437
def launch_and_record_execution(
1✔
438
    target_aws_client,
439
    sfn_snapshot,
440
    state_machine_arn,
441
    execution_input,
442
    verify_execution_description=False,
443
) -> LongArn:
444
    stepfunctions_client = target_aws_client.stepfunctions
1✔
445
    exec_resp = stepfunctions_client.start_execution(
1✔
446
        stateMachineArn=state_machine_arn, input=execution_input
447
    )
448
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
449
    execution_arn = exec_resp["executionArn"]
1✔
450

451
    await_execution_terminated(
1✔
452
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
453
    )
454

455
    if verify_execution_description:
1✔
456
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
457
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
458

459
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
460

461
    # Transform all map runs if any.
462
    try:
1✔
463
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
464
        if isinstance(map_run_arns, str):
1✔
465
            map_run_arns = [map_run_arns]
1✔
466
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
467
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
468
    except NoSuchJsonPathError:
1✔
469
        # No mapRunArns
470
        pass
1✔
471

472
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
473

474
    return execution_arn
1✔
475

476

477
def launch_and_record_mocked_execution(
1✔
478
    target_aws_client,
479
    sfn_snapshot,
480
    state_machine_arn,
481
    execution_input,
482
    test_name,
483
) -> LongArn:
484
    stepfunctions_client = target_aws_client.stepfunctions
1✔
485
    exec_resp = stepfunctions_client.start_execution(
1✔
486
        stateMachineArn=f"{state_machine_arn}#{test_name}", input=execution_input
487
    )
488
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
489
    execution_arn = exec_resp["executionArn"]
1✔
490

491
    await_execution_terminated(
1✔
492
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
493
    )
494

495
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
496

497
    # Transform all map runs if any.
498
    try:
1✔
499
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
500
        if isinstance(map_run_arns, str):
1✔
501
            map_run_arns = [map_run_arns]
×
502
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
503
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
504
    except NoSuchJsonPathError:
1✔
505
        # No mapRunArns
506
        pass
1✔
507

508
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
509

510
    return execution_arn
1✔
511

512

513
def launch_and_record_mocked_sync_execution(
1✔
514
    target_aws_client,
515
    sfn_snapshot,
516
    state_machine_arn,
517
    execution_input,
518
    test_name,
519
) -> LongArn:
520
    stepfunctions_client = target_aws_client.stepfunctions
1✔
521

522
    exec_resp = stepfunctions_client.start_sync_execution(
1✔
523
        stateMachineArn=f"{state_machine_arn}#{test_name}",
524
        input=execution_input,
525
    )
526

527
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
528

529
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
530

531
    return exec_resp["executionArn"]
1✔
532

533

534
def launch_and_record_logs(
1✔
535
    target_aws_client,
536
    state_machine_arn,
537
    execution_input,
538
    log_level,
539
    log_group_name,
540
    sfn_snapshot,
541
):
542
    execution_arn = launch_and_record_execution(
1✔
543
        target_aws_client,
544
        sfn_snapshot,
545
        state_machine_arn,
546
        execution_input,
547
    )
548
    expected_events = get_expected_execution_logs(
1✔
549
        target_aws_client.stepfunctions, log_level, execution_arn
550
    )
551

552
    if log_level == LogLevel.OFF or not expected_events:
1✔
553
        # The test should terminate here, as no log streams for this execution would have been created.
554
        return
1✔
555

556
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
557
    logged_execution_events = await_on_execution_logs(
1✔
558
        target_aws_client, log_group_name, logs_validation_function
559
    )
560

561
    sfn_snapshot.add_transformer(
1✔
562
        JsonpathTransformer(
563
            jsonpath="$..event_timestamp",
564
            replacement="timestamp",
565
            replace_reference=False,
566
        )
567
    )
568
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
569

570

571
def create_and_record_execution(
1✔
572
    target_aws_client,
573
    create_state_machine_iam_role,
574
    create_state_machine,
575
    sfn_snapshot,
576
    definition,
577
    execution_input,
578
    verify_execution_description=False,
579
) -> LongArn:
580
    state_machine_arn = create_state_machine_with_iam_role(
1✔
581
        target_aws_client,
582
        create_state_machine_iam_role,
583
        create_state_machine,
584
        sfn_snapshot,
585
        definition,
586
    )
587
    exeuction_arn = launch_and_record_execution(
1✔
588
        target_aws_client,
589
        sfn_snapshot,
590
        state_machine_arn,
591
        execution_input,
592
        verify_execution_description,
593
    )
594
    return exeuction_arn
1✔
595

596

597
def create_and_record_mocked_execution(
1✔
598
    target_aws_client,
599
    create_state_machine_iam_role,
600
    create_state_machine,
601
    sfn_snapshot,
602
    definition,
603
    execution_input,
604
    state_machine_name,
605
    test_name,
606
    state_machine_type: StateMachineType = StateMachineType.STANDARD,
607
) -> LongArn:
608
    state_machine_arn = create_state_machine_with_iam_role(
1✔
609
        target_aws_client,
610
        create_state_machine_iam_role,
611
        create_state_machine,
612
        sfn_snapshot,
613
        definition,
614
        state_machine_name=state_machine_name,
615
        state_machine_type=state_machine_type,
616
    )
617
    execution_arn = launch_and_record_mocked_execution(
1✔
618
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
619
    )
620
    return execution_arn
1✔
621

622

623
def create_and_record_mocked_sync_execution(
1✔
624
    target_aws_client,
625
    create_state_machine_iam_role,
626
    create_state_machine,
627
    sfn_snapshot,
628
    definition,
629
    execution_input,
630
    state_machine_name,
631
    test_name,
632
) -> LongArn:
633
    state_machine_arn = create_state_machine_with_iam_role(
1✔
634
        target_aws_client,
635
        create_state_machine_iam_role,
636
        create_state_machine,
637
        sfn_snapshot,
638
        definition,
639
        state_machine_name=state_machine_name,
640
        state_machine_type=StateMachineType.EXPRESS,
641
    )
642
    execution_arn = launch_and_record_mocked_sync_execution(
1✔
643
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
644
    )
645
    return execution_arn
1✔
646

647

648
def create_and_run_mock(
1✔
649
    target_aws_client,
650
    monkeypatch,
651
    mock_config_file,
652
    mock_config: dict,
653
    state_machine_name: str,
654
    definition_template: dict,
655
    execution_input: str,
656
    test_name: str,
657
):
658
    mock_config_file_path = mock_config_file(mock_config)
1✔
659
    monkeypatch.setattr(config, "SFN_MOCK_CONFIG", mock_config_file_path)
1✔
660

661
    sfn_client = target_aws_client.stepfunctions
1✔
662

663
    state_machine_name: str = state_machine_name or f"mocked_statemachine_{short_uid()}"
1✔
664
    definition = json.dumps(definition_template)
1✔
665
    creation_response = sfn_client.create_state_machine(
1✔
666
        name=state_machine_name,
667
        definition=definition,
668
        roleArn="arn:aws:iam::111111111111:role/mock-role/mocked-run",
669
    )
670
    state_machine_arn = creation_response["stateMachineArn"]
1✔
671

672
    test_case_arn = f"{state_machine_arn}#{test_name}"
1✔
673
    execution = sfn_client.start_execution(stateMachineArn=test_case_arn, input=execution_input)
1✔
674
    execution_arn = execution["executionArn"]
1✔
675

676
    await_execution_terminated(stepfunctions_client=sfn_client, execution_arn=execution_arn)
1✔
677
    sfn_client.delete_state_machine(stateMachineArn=state_machine_arn)
1✔
678

679
    return execution_arn
1✔
680

681

682
def create_and_record_logs(
1✔
683
    target_aws_client,
684
    create_state_machine_iam_role,
685
    create_state_machine,
686
    sfn_create_log_group,
687
    sfn_snapshot,
688
    definition,
689
    execution_input,
690
    log_level: LogLevel,
691
    include_execution_data: bool,
692
):
693
    state_machine_arn = create_state_machine_with_iam_role(
1✔
694
        target_aws_client,
695
        create_state_machine_iam_role,
696
        create_state_machine,
697
        sfn_snapshot,
698
        definition,
699
    )
700

701
    log_group_name = sfn_create_log_group()
1✔
702
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
703
        "logGroups"
704
    ][0]["arn"]
705
    logging_configuration = LoggingConfiguration(
1✔
706
        level=log_level,
707
        includeExecutionData=include_execution_data,
708
        destinations=[
709
            LogDestination(
710
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
711
            ),
712
        ],
713
    )
714
    target_aws_client.stepfunctions.update_state_machine(
1✔
715
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
716
    )
717

718
    launch_and_record_logs(
1✔
719
        target_aws_client,
720
        state_machine_arn,
721
        execution_input,
722
        log_level,
723
        log_group_name,
724
        sfn_snapshot,
725
    )
726

727

728
def launch_and_record_sync_execution(
1✔
729
    target_aws_client,
730
    sfn_snapshot,
731
    state_machine_arn,
732
    execution_input,
733
):
734
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
735
        stateMachineArn=state_machine_arn,
736
        input=execution_input,
737
    )
738
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
739
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
740

741

742
def create_and_record_express_sync_execution(
1✔
743
    target_aws_client,
744
    create_state_machine_iam_role,
745
    create_state_machine,
746
    sfn_snapshot,
747
    definition,
748
    execution_input,
749
):
750
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
751
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
752

753
    creation_response = create_state_machine(
1✔
754
        target_aws_client,
755
        name=f"express_statemachine_{short_uid()}",
756
        definition=definition,
757
        roleArn=snf_role_arn,
758
        type=StateMachineType.EXPRESS,
759
    )
760
    state_machine_arn = creation_response["stateMachineArn"]
1✔
761
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
762
    sfn_snapshot.match("creation_response", creation_response)
1✔
763

764
    launch_and_record_sync_execution(
1✔
765
        target_aws_client,
766
        sfn_snapshot,
767
        state_machine_arn,
768
        execution_input,
769
    )
770

771

772
def launch_and_record_express_async_execution(
1✔
773
    target_aws_client,
774
    sfn_snapshot,
775
    state_machine_arn,
776
    log_group_name,
777
    execution_input,
778
):
779
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
780
        stateMachineArn=state_machine_arn, input=execution_input
781
    )
782
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
783
    execution_arn = start_execution["executionArn"]
1✔
784

785
    event_list = await_on_execution_logs(
1✔
786
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
787
    )
788
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
789
    end_event = event_list[-1]
1✔
790
    sfn_snapshot.match("end_event", end_event)
1✔
791

792
    return execution_arn
1✔
793

794

795
def create_and_record_express_async_execution(
1✔
796
    target_aws_client,
797
    create_state_machine_iam_role,
798
    create_state_machine,
799
    sfn_create_log_group,
800
    sfn_snapshot,
801
    definition,
802
    execution_input,
803
    include_execution_data: bool = True,
804
) -> tuple[LongArn, LongArn]:
805
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
806
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
807

808
    log_group_name = sfn_create_log_group()
1✔
809
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
810
        "logGroups"
811
    ][0]["arn"]
812
    logging_configuration = LoggingConfiguration(
1✔
813
        level=LogLevel.ALL,
814
        includeExecutionData=include_execution_data,
815
        destinations=[
816
            LogDestination(
817
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
818
            ),
819
        ],
820
    )
821

822
    creation_response = create_state_machine(
1✔
823
        target_aws_client,
824
        name=f"express_statemachine_{short_uid()}",
825
        definition=definition,
826
        roleArn=snf_role_arn,
827
        type=StateMachineType.EXPRESS,
828
        loggingConfiguration=logging_configuration,
829
    )
830
    state_machine_arn = creation_response["stateMachineArn"]
1✔
831
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
832
    sfn_snapshot.match("creation_response", creation_response)
1✔
833

834
    execution_arn = launch_and_record_express_async_execution(
1✔
835
        target_aws_client,
836
        sfn_snapshot,
837
        state_machine_arn,
838
        log_group_name,
839
        execution_input,
840
    )
841
    return state_machine_arn, execution_arn
1✔
842

843

844
def create_and_record_events(
1✔
845
    create_state_machine_iam_role,
846
    create_state_machine,
847
    sfn_events_to_sqs_queue,
848
    target_aws_client,
849
    sfn_snapshot,
850
    definition,
851
    execution_input,
852
):
853
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
854
    sfn_snapshot.add_transformers_list(
1✔
855
        [
856
            JsonpathTransformer(
857
                jsonpath="$..detail.startDate",
858
                replacement="start-date",
859
                replace_reference=False,
860
            ),
861
            JsonpathTransformer(
862
                jsonpath="$..detail.stopDate",
863
                replacement="stop-date",
864
                replace_reference=False,
865
            ),
866
            JsonpathTransformer(
867
                jsonpath="$..detail.name",
868
                replacement="test_event_bridge_events-{short_uid()}",
869
                replace_reference=False,
870
            ),
871
        ]
872
    )
873

874
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
875
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
876
        target_aws_client,
877
        name=f"test_event_bridge_events-{short_uid()}",
878
        definition=definition,
879
        roleArn=snf_role_arn,
880
    )
881
    state_machine_arn = create_output["stateMachineArn"]
1✔
882

883
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
884

885
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
886
        stateMachineArn=state_machine_arn, input=execution_input
887
    )
888
    execution_arn = start_execution["executionArn"]
1✔
889
    await_execution_terminated(
1✔
890
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
891
    )
892

893
    stepfunctions_events = []
1✔
894

895
    def _get_events():
1✔
896
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
897
        for message in received.get("Messages", []):
1✔
898
            body = json.loads(message["Body"])
1✔
899
            stepfunctions_events.append(body)
1✔
900
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
901
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
902

903
    poll_condition(_get_events, timeout=60)
1✔
904

905
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
906

907

908
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
909
    stepfunctions_events = []
1✔
910

911
    def _get_events():
1✔
912
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
913
        for message in received.get("Messages", []):
1✔
914
            body = json.loads(message["Body"])
1✔
915
            stepfunctions_events.append(body)
1✔
916
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
917
        return len(stepfunctions_events) == num_events
1✔
918

919
    poll_condition(_get_events, timeout=60)
1✔
920
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", {})))
1✔
921
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
922

923

924
class SfnNoneRecursiveParallelTransformer:
1✔
925
    """
926
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
927
    """
928

929
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
930
        self.events_jsonpath: str = events_jsonpath
1✔
931

932
    @staticmethod
1✔
933
    def _normalise_events(events: list[dict]) -> None:
1✔
934
        start_idx = None
1✔
935
        sublist = []
1✔
936
        in_sublist = False
1✔
937
        for i, event in enumerate(events):
1✔
938
            event_type = event.get("type")
1✔
939
            if event_type is None:
1✔
940
                LOG.debug(
×
941
                    "No 'type' in event item '%s'.",
942
                    event,
943
                )
944
                in_sublist = False
×
945

946
            elif event_type in {
1✔
947
                None,
948
                HistoryEventType.ParallelStateSucceeded,
949
                HistoryEventType.ParallelStateAborted,
950
                HistoryEventType.ParallelStateExited,
951
                HistoryEventType.ParallelStateFailed,
952
            }:
953
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
954
                in_sublist = False
1✔
955
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
956
                in_sublist = True
1✔
957
                sublist = []
1✔
958
                start_idx = i + 1
1✔
959
            elif in_sublist:
1✔
960
                event["id"] = (0,)
1✔
961
                event["previousEventId"] = 0
1✔
962
                sublist.append(event)
1✔
963

964
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
965
        pattern = parse("$..events")
1✔
966
        events = pattern.find(input_data)
1✔
967
        if not events:
1✔
968
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
969
            return input_data
×
970

971
        for events_data in events:
1✔
972
            self._normalise_events(events_data.value)
1✔
973

974
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc