• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18363705737

08 Oct 2025 07:39PM UTC coverage: 86.912% (+0.02%) from 86.893%
18363705737

push

github

web-flow
Fix Event Bridge input transformation of booleans (#13236)

6 of 6 new or added lines in 1 file covered. (100.0%)

145 existing lines in 8 files now uncovered.

67991 of 78230 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.18
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from typing import Callable, Final, Optional
1✔
4

5
from botocore.exceptions import ClientError
1✔
6
from jsonpath_ng.ext import parse
1✔
7
from localstack_snapshot.snapshots.transformer import (
1✔
8
    JsonpathTransformer,
9
    RegexTransformer,
10
    TransformContext,
11
)
12

13
from localstack import config
1✔
14
from localstack.aws.api.stepfunctions import (
1✔
15
    Arn,
16
    CloudWatchLogsLogGroup,
17
    CreateStateMachineOutput,
18
    Definition,
19
    ExecutionStatus,
20
    HistoryEventList,
21
    HistoryEventType,
22
    LogDestination,
23
    LoggingConfiguration,
24
    LogLevel,
25
    LongArn,
26
    StateMachineType,
27
)
28
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
29
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
30
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
31
from localstack.testing.aws.util import is_aws_cloud
1✔
32
from localstack.utils.strings import short_uid
1✔
33
from localstack.utils.sync import poll_condition
1✔
34

35
LOG = logging.getLogger(__name__)
1✔
36

37

38
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
39
# Running executions may emit logs after DeleteStateMachine API is called.
40
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
41
_SAMPLING_INTERVAL_SECONDS_AWS_CLOUD: Final[int] = 1
1✔
42
_SAMPLING_INTERVAL_SECONDS_LOCALSTACK: Final[float] = 0.2
1✔
43

44

45
def _get_sampling_interval_seconds() -> int | float:
1✔
46
    return (
1✔
47
        _SAMPLING_INTERVAL_SECONDS_AWS_CLOUD
48
        if is_aws_cloud()
49
        else _SAMPLING_INTERVAL_SECONDS_LOCALSTACK
50
    )
51

52

53
def await_no_state_machines_listed(stepfunctions_client):
1✔
54
    def _is_empty_state_machine_list():
×
55
        lst_resp = stepfunctions_client.list_state_machines()
×
56
        state_machines = lst_resp["stateMachines"]
×
57
        return not bool(state_machines)
×
58

59
    success = poll_condition(
×
60
        condition=_is_empty_state_machine_list,
61
        timeout=_DELETION_TIMEOUT_SECS,
62
        interval=_get_sampling_interval_seconds(),
63
    )
64
    if not success:
×
65
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
66

67

68
def _is_state_machine_alias_listed(
1✔
69
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
70
):
71
    list_state_machine_aliases_list = stepfunctions_client.list_state_machine_aliases(
1✔
72
        stateMachineArn=state_machine_arn
73
    )
74
    state_machine_aliases = list_state_machine_aliases_list["stateMachineAliases"]
1✔
75
    for state_machine_alias in state_machine_aliases:
1✔
76
        if state_machine_alias["stateMachineAliasArn"] == state_machine_alias_arn:
1✔
77
            return True
1✔
78
    return False
1✔
79

80

81
def await_state_machine_alias_is_created(
1✔
82
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
83
):
84
    success = poll_condition(
1✔
85
        condition=lambda: _is_state_machine_alias_listed(
86
            stepfunctions_client=stepfunctions_client,
87
            state_machine_arn=state_machine_arn,
88
            state_machine_alias_arn=state_machine_alias_arn,
89
        ),
90
        timeout=_DELETION_TIMEOUT_SECS,
91
        interval=_get_sampling_interval_seconds(),
92
    )
93
    if not success:
1✔
94
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
95

96

97
def await_state_machine_alias_is_deleted(
1✔
98
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
99
):
100
    success = poll_condition(
1✔
101
        condition=lambda: not _is_state_machine_alias_listed(
102
            stepfunctions_client=stepfunctions_client,
103
            state_machine_arn=state_machine_arn,
104
            state_machine_alias_arn=state_machine_alias_arn,
105
        ),
106
        timeout=_DELETION_TIMEOUT_SECS,
107
        interval=_get_sampling_interval_seconds(),
108
    )
109
    if not success:
1✔
110
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
111

112

113
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
114
    lst_resp = stepfunctions_client.list_state_machines()
1✔
115
    state_machines = lst_resp["stateMachines"]
1✔
116
    for state_machine in state_machines:
1✔
117
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
118
            return True
1✔
119
    return False
1✔
120

121

122
def _is_state_machine_version_listed(
1✔
123
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
124
) -> bool:
125
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
126
    versions = lst_resp["stateMachineVersions"]
1✔
127
    for version in versions:
1✔
128
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
129
            return True
1✔
130
    return False
1✔
131

132

133
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
134
    success = poll_condition(
1✔
135
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
136
        timeout=_DELETION_TIMEOUT_SECS,
137
        interval=_get_sampling_interval_seconds(),
138
    )
139
    if not success:
1✔
140
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
141

142

143
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
144
    success = poll_condition(
1✔
145
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
146
        timeout=_DELETION_TIMEOUT_SECS,
147
        interval=_get_sampling_interval_seconds(),
148
    )
149
    if not success:
1✔
150
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
151

152

153
def await_state_machine_version_not_listed(
1✔
154
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
155
):
156
    success = poll_condition(
1✔
157
        condition=lambda: not _is_state_machine_version_listed(
158
            stepfunctions_client, state_machine_arn, state_machine_version_arn
159
        ),
160
        timeout=_DELETION_TIMEOUT_SECS,
161
        interval=_get_sampling_interval_seconds(),
162
    )
163
    if not success:
1✔
164
        LOG.warning(
×
165
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
166
            state_machine_arn,
167
            state_machine_version_arn,
168
        )
169

170

171
def await_state_machine_version_listed(
1✔
172
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
173
):
174
    success = poll_condition(
1✔
175
        condition=lambda: _is_state_machine_version_listed(
176
            stepfunctions_client, state_machine_arn, state_machine_version_arn
177
        ),
178
        timeout=_DELETION_TIMEOUT_SECS,
179
        interval=_get_sampling_interval_seconds(),
180
    )
181
    if not success:
1✔
182
        LOG.warning(
×
183
            "Timed out whilst awaiting for version of %s to include '%s'.",
184
            state_machine_arn,
185
            state_machine_version_arn,
186
        )
187

188

189
def await_on_execution_events(
1✔
190
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
191
) -> HistoryEventList:
192
    events: HistoryEventList = []
1✔
193

194
    def _run_check():
1✔
195
        nonlocal events
196
        events.clear()
1✔
197
        try:
1✔
198
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
199
        except ClientError:
×
200
            return False
×
201
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
202
        res: bool = check_func(events)
1✔
203
        return res
1✔
204

205
    assert poll_condition(
1✔
206
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
207
    )
208
    return events
1✔
209

210

211
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
212
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
213
        if len(events) > 0:
1✔
214
            last_event = events[-1]
1✔
215
            return "executionSucceededEventDetails" in last_event
1✔
216
        return False
1✔
217

218
    return await_on_execution_events(
1✔
219
        stepfunctions_client=stepfunctions_client,
220
        execution_arn=execution_arn,
221
        check_func=_check_last_is_success,
222
    )
223

224

225
def await_list_execution_status(
1✔
226
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
227
):
228
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
229

230
    def _run_check():
1✔
231
        list_resp = stepfunctions_client.list_executions(
1✔
232
            stateMachineArn=state_machine_arn, statusFilter=status
233
        )
234
        for execution in list_resp.get("executions", []):
1✔
235
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
236
                continue
×
237
            return True
1✔
UNCOV
238
        return False
×
239

240
    success = poll_condition(
1✔
241
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
242
    )
243
    if not success:
1✔
244
        LOG.warning(
×
245
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
246
            status,
247
            execution_arn,
248
        )
249

250

251
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
252
    if len(events) > 0:
1✔
253
        last_event = events[-1]
1✔
254
        last_event_type = last_event.get("type")
1✔
255
        return last_event_type is None or last_event_type in {
1✔
256
            HistoryEventType.ExecutionFailed,
257
            HistoryEventType.ExecutionAborted,
258
            HistoryEventType.ExecutionTimedOut,
259
            HistoryEventType.ExecutionSucceeded,
260
        }
261
    return False
1✔
262

263

264
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
265
    return await_on_execution_events(
1✔
266
        stepfunctions_client=stepfunctions_client,
267
        execution_arn=execution_arn,
268
        check_func=_is_last_history_event_terminal,
269
    )
270

271

272
def await_execution_lists_terminated(
1✔
273
    stepfunctions_client, state_machine_arn: str, execution_arn: str
274
):
275
    def _check_last_is_terminal() -> bool:
1✔
276
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
277
        executions = list_output["executions"]
1✔
278
        for execution in executions:
1✔
279
            if execution["executionArn"] == execution_arn:
1✔
280
                return execution["status"] != ExecutionStatus.RUNNING
1✔
281
        return False
×
282

283
    success = poll_condition(
1✔
284
        condition=_check_last_is_terminal, timeout=120, interval=_get_sampling_interval_seconds()
285
    )
286
    if not success:
1✔
287
        LOG.warning(
×
288
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
289
            execution_arn,
290
        )
291

292

293
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
294
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
295
        for event in events:
1✔
296
            return "executionStartedEventDetails" in event
1✔
297
        return False
×
298

299
    return await_on_execution_events(
1✔
300
        stepfunctions_client=stepfunctions_client,
301
        execution_arn=execution_arn,
302
        check_func=_check_stated_exists,
303
    )
304

305

306
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
307
    def _run_check():
1✔
308
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
309
        status: ExecutionStatus = desc_res["status"]
1✔
310
        return status == ExecutionStatus.ABORTED
1✔
311

312
    success = poll_condition(
1✔
313
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
314
    )
315
    if not success:
1✔
316
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
317

318

319
def get_expected_execution_logs(
1✔
320
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
321
) -> HistoryEventList:
322
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
323
    execution_history_events = execution_history["events"]
1✔
324
    expected_events = [
1✔
325
        event
326
        for event in execution_history_events
327
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
328
    ]
329
    return expected_events
1✔
330

331

332
def is_execution_logs_list_complete(
1✔
333
    expected_events: HistoryEventList,
334
) -> Callable[[HistoryEventList], bool]:
335
    def _validation_function(log_events: list) -> bool:
1✔
336
        if not expected_events:
1✔
337
            return True
×
338
        return len(expected_events) == len(log_events)
1✔
339

340
    return _validation_function
1✔
341

342

343
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
344
    logs_client = target_aws_client.logs
1✔
345
    log_stream_name = ""
1✔
346

347
    def _run_check():
1✔
348
        nonlocal log_stream_name
349
        try:
1✔
350
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
351
                "logStreams"
352
            ]
353
            if not log_streams:
1✔
354
                return False
×
355

356
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
357
            if (
1✔
358
                log_stream_name
359
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
360
            ):
361
                # SFN has not yet create the log stream for the execution, only the validation steam.
362
                return False
1✔
363
            return True
1✔
364
        except ClientError:
×
365
            return False
×
366

367
    assert poll_condition(condition=_run_check)
1✔
368
    return log_stream_name
1✔
369

370

371
def await_on_execution_logs(
1✔
372
    target_aws_client,
373
    log_group_name: str,
374
    validation_function: Callable[[HistoryEventList], bool] = None,
375
) -> HistoryEventList:
376
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
377

378
    logs_client = target_aws_client.logs
1✔
379
    events: HistoryEventList = []
1✔
380

381
    def _run_check():
1✔
382
        nonlocal events
383
        events.clear()
1✔
384
        try:
1✔
385
            log_events = logs_client.get_log_events(
1✔
386
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
387
            )["events"]
388
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
389
        except ClientError:
×
390
            return False
×
391

392
        res = validation_function(events)
1✔
393
        return res
1✔
394

395
    assert poll_condition(condition=_run_check)
1✔
396
    return events
1✔
397

398

399
def create_state_machine_with_iam_role(
1✔
400
    target_aws_client,
401
    create_state_machine_iam_role,
402
    create_state_machine,
403
    snapshot,
404
    definition: Definition,
405
    logging_configuration: Optional[LoggingConfiguration] = None,
406
    state_machine_name: Optional[str] = None,
407
    state_machine_type: StateMachineType = StateMachineType.STANDARD,
408
):
409
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
410
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
411
    snapshot.add_transformer(
1✔
412
        RegexTransformer(
413
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
414
            "Extended Request ID: <extended_request_id>",
415
        )
416
    )
417
    snapshot.add_transformer(
1✔
418
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
419
    )
420

421
    sm_name: str = state_machine_name or f"statemachine_create_and_record_execution_{short_uid()}"
1✔
422
    create_arguments = {
1✔
423
        "name": sm_name,
424
        "definition": definition,
425
        "roleArn": snf_role_arn,
426
        "type": state_machine_type,
427
    }
428
    if logging_configuration is not None:
1✔
429
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
430
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
431
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
432
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
433
    return state_machine_arn
1✔
434

435

436
def launch_and_record_execution(
1✔
437
    target_aws_client,
438
    sfn_snapshot,
439
    state_machine_arn,
440
    execution_input,
441
    verify_execution_description=False,
442
) -> LongArn:
443
    stepfunctions_client = target_aws_client.stepfunctions
1✔
444
    exec_resp = stepfunctions_client.start_execution(
1✔
445
        stateMachineArn=state_machine_arn, input=execution_input
446
    )
447
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
448
    execution_arn = exec_resp["executionArn"]
1✔
449

450
    await_execution_terminated(
1✔
451
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
452
    )
453

454
    if verify_execution_description:
1✔
455
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
456
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
457

458
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
459

460
    # Transform all map runs if any.
461
    try:
1✔
462
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
463
        if isinstance(map_run_arns, str):
1✔
464
            map_run_arns = [map_run_arns]
1✔
465
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
466
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
467
    except NoSuchJsonPathError:
1✔
468
        # No mapRunArns
469
        pass
1✔
470

471
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
472

473
    return execution_arn
1✔
474

475

476
def launch_and_record_mocked_execution(
1✔
477
    target_aws_client,
478
    sfn_snapshot,
479
    state_machine_arn,
480
    execution_input,
481
    test_name,
482
) -> LongArn:
483
    stepfunctions_client = target_aws_client.stepfunctions
1✔
484
    exec_resp = stepfunctions_client.start_execution(
1✔
485
        stateMachineArn=f"{state_machine_arn}#{test_name}", input=execution_input
486
    )
487
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
488
    execution_arn = exec_resp["executionArn"]
1✔
489

490
    await_execution_terminated(
1✔
491
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
492
    )
493

494
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
495

496
    # Transform all map runs if any.
497
    try:
1✔
498
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
499
        if isinstance(map_run_arns, str):
1✔
500
            map_run_arns = [map_run_arns]
×
501
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
502
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
503
    except NoSuchJsonPathError:
1✔
504
        # No mapRunArns
505
        pass
1✔
506

507
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
508

509
    return execution_arn
1✔
510

511

512
def launch_and_record_mocked_sync_execution(
1✔
513
    target_aws_client,
514
    sfn_snapshot,
515
    state_machine_arn,
516
    execution_input,
517
    test_name,
518
) -> LongArn:
519
    stepfunctions_client = target_aws_client.stepfunctions
1✔
520

521
    exec_resp = stepfunctions_client.start_sync_execution(
1✔
522
        stateMachineArn=f"{state_machine_arn}#{test_name}",
523
        input=execution_input,
524
    )
525

526
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
527

528
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
529

530
    return exec_resp["executionArn"]
1✔
531

532

533
def launch_and_record_logs(
1✔
534
    target_aws_client,
535
    state_machine_arn,
536
    execution_input,
537
    log_level,
538
    log_group_name,
539
    sfn_snapshot,
540
):
541
    execution_arn = launch_and_record_execution(
1✔
542
        target_aws_client,
543
        sfn_snapshot,
544
        state_machine_arn,
545
        execution_input,
546
    )
547
    expected_events = get_expected_execution_logs(
1✔
548
        target_aws_client.stepfunctions, log_level, execution_arn
549
    )
550

551
    if log_level == LogLevel.OFF or not expected_events:
1✔
552
        # The test should terminate here, as no log streams for this execution would have been created.
553
        return
1✔
554

555
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
556
    logged_execution_events = await_on_execution_logs(
1✔
557
        target_aws_client, log_group_name, logs_validation_function
558
    )
559

560
    sfn_snapshot.add_transformer(
1✔
561
        JsonpathTransformer(
562
            jsonpath="$..event_timestamp",
563
            replacement="timestamp",
564
            replace_reference=False,
565
        )
566
    )
567
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
568

569

570
def create_and_record_execution(
1✔
571
    target_aws_client,
572
    create_state_machine_iam_role,
573
    create_state_machine,
574
    sfn_snapshot,
575
    definition,
576
    execution_input,
577
    verify_execution_description=False,
578
) -> LongArn:
579
    state_machine_arn = create_state_machine_with_iam_role(
1✔
580
        target_aws_client,
581
        create_state_machine_iam_role,
582
        create_state_machine,
583
        sfn_snapshot,
584
        definition,
585
    )
586
    exeuction_arn = launch_and_record_execution(
1✔
587
        target_aws_client,
588
        sfn_snapshot,
589
        state_machine_arn,
590
        execution_input,
591
        verify_execution_description,
592
    )
593
    return exeuction_arn
1✔
594

595

596
def create_and_record_mocked_execution(
1✔
597
    target_aws_client,
598
    create_state_machine_iam_role,
599
    create_state_machine,
600
    sfn_snapshot,
601
    definition,
602
    execution_input,
603
    state_machine_name,
604
    test_name,
605
    state_machine_type: StateMachineType = StateMachineType.STANDARD,
606
) -> LongArn:
607
    state_machine_arn = create_state_machine_with_iam_role(
1✔
608
        target_aws_client,
609
        create_state_machine_iam_role,
610
        create_state_machine,
611
        sfn_snapshot,
612
        definition,
613
        state_machine_name=state_machine_name,
614
        state_machine_type=state_machine_type,
615
    )
616
    execution_arn = launch_and_record_mocked_execution(
1✔
617
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
618
    )
619
    return execution_arn
1✔
620

621

622
def create_and_record_mocked_sync_execution(
1✔
623
    target_aws_client,
624
    create_state_machine_iam_role,
625
    create_state_machine,
626
    sfn_snapshot,
627
    definition,
628
    execution_input,
629
    state_machine_name,
630
    test_name,
631
) -> LongArn:
632
    state_machine_arn = create_state_machine_with_iam_role(
1✔
633
        target_aws_client,
634
        create_state_machine_iam_role,
635
        create_state_machine,
636
        sfn_snapshot,
637
        definition,
638
        state_machine_name=state_machine_name,
639
        state_machine_type=StateMachineType.EXPRESS,
640
    )
641
    execution_arn = launch_and_record_mocked_sync_execution(
1✔
642
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
643
    )
644
    return execution_arn
1✔
645

646

647
def create_and_run_mock(
1✔
648
    target_aws_client,
649
    monkeypatch,
650
    mock_config_file,
651
    mock_config: dict,
652
    state_machine_name: str,
653
    definition_template: dict,
654
    execution_input: str,
655
    test_name: str,
656
):
657
    mock_config_file_path = mock_config_file(mock_config)
1✔
658
    monkeypatch.setattr(config, "SFN_MOCK_CONFIG", mock_config_file_path)
1✔
659

660
    sfn_client = target_aws_client.stepfunctions
1✔
661

662
    state_machine_name: str = state_machine_name or f"mocked_statemachine_{short_uid()}"
1✔
663
    definition = json.dumps(definition_template)
1✔
664
    creation_response = sfn_client.create_state_machine(
1✔
665
        name=state_machine_name,
666
        definition=definition,
667
        roleArn="arn:aws:iam::111111111111:role/mock-role/mocked-run",
668
    )
669
    state_machine_arn = creation_response["stateMachineArn"]
1✔
670

671
    test_case_arn = f"{state_machine_arn}#{test_name}"
1✔
672
    execution = sfn_client.start_execution(stateMachineArn=test_case_arn, input=execution_input)
1✔
673
    execution_arn = execution["executionArn"]
1✔
674

675
    await_execution_terminated(stepfunctions_client=sfn_client, execution_arn=execution_arn)
1✔
676
    sfn_client.delete_state_machine(stateMachineArn=state_machine_arn)
1✔
677

678
    return execution_arn
1✔
679

680

681
def create_and_record_logs(
1✔
682
    target_aws_client,
683
    create_state_machine_iam_role,
684
    create_state_machine,
685
    sfn_create_log_group,
686
    sfn_snapshot,
687
    definition,
688
    execution_input,
689
    log_level: LogLevel,
690
    include_execution_data: bool,
691
):
692
    state_machine_arn = create_state_machine_with_iam_role(
1✔
693
        target_aws_client,
694
        create_state_machine_iam_role,
695
        create_state_machine,
696
        sfn_snapshot,
697
        definition,
698
    )
699

700
    log_group_name = sfn_create_log_group()
1✔
701
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
702
        "logGroups"
703
    ][0]["arn"]
704
    logging_configuration = LoggingConfiguration(
1✔
705
        level=log_level,
706
        includeExecutionData=include_execution_data,
707
        destinations=[
708
            LogDestination(
709
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
710
            ),
711
        ],
712
    )
713
    target_aws_client.stepfunctions.update_state_machine(
1✔
714
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
715
    )
716

717
    launch_and_record_logs(
1✔
718
        target_aws_client,
719
        state_machine_arn,
720
        execution_input,
721
        log_level,
722
        log_group_name,
723
        sfn_snapshot,
724
    )
725

726

727
def launch_and_record_sync_execution(
1✔
728
    target_aws_client,
729
    sfn_snapshot,
730
    state_machine_arn,
731
    execution_input,
732
):
733
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
734
        stateMachineArn=state_machine_arn,
735
        input=execution_input,
736
    )
737
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
738
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
739

740

741
def create_and_record_express_sync_execution(
1✔
742
    target_aws_client,
743
    create_state_machine_iam_role,
744
    create_state_machine,
745
    sfn_snapshot,
746
    definition,
747
    execution_input,
748
):
749
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
750
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
751

752
    creation_response = create_state_machine(
1✔
753
        target_aws_client,
754
        name=f"express_statemachine_{short_uid()}",
755
        definition=definition,
756
        roleArn=snf_role_arn,
757
        type=StateMachineType.EXPRESS,
758
    )
759
    state_machine_arn = creation_response["stateMachineArn"]
1✔
760
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
761
    sfn_snapshot.match("creation_response", creation_response)
1✔
762

763
    launch_and_record_sync_execution(
1✔
764
        target_aws_client,
765
        sfn_snapshot,
766
        state_machine_arn,
767
        execution_input,
768
    )
769

770

771
def launch_and_record_express_async_execution(
1✔
772
    target_aws_client,
773
    sfn_snapshot,
774
    state_machine_arn,
775
    log_group_name,
776
    execution_input,
777
):
778
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
779
        stateMachineArn=state_machine_arn, input=execution_input
780
    )
781
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
782
    execution_arn = start_execution["executionArn"]
1✔
783

784
    event_list = await_on_execution_logs(
1✔
785
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
786
    )
787
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
788
    end_event = event_list[-1]
1✔
789
    sfn_snapshot.match("end_event", end_event)
1✔
790

791
    return execution_arn
1✔
792

793

794
def create_and_record_express_async_execution(
1✔
795
    target_aws_client,
796
    create_state_machine_iam_role,
797
    create_state_machine,
798
    sfn_create_log_group,
799
    sfn_snapshot,
800
    definition,
801
    execution_input,
802
    include_execution_data: bool = True,
803
) -> tuple[LongArn, LongArn]:
804
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
805
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
806

807
    log_group_name = sfn_create_log_group()
1✔
808
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
809
        "logGroups"
810
    ][0]["arn"]
811
    logging_configuration = LoggingConfiguration(
1✔
812
        level=LogLevel.ALL,
813
        includeExecutionData=include_execution_data,
814
        destinations=[
815
            LogDestination(
816
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
817
            ),
818
        ],
819
    )
820

821
    creation_response = create_state_machine(
1✔
822
        target_aws_client,
823
        name=f"express_statemachine_{short_uid()}",
824
        definition=definition,
825
        roleArn=snf_role_arn,
826
        type=StateMachineType.EXPRESS,
827
        loggingConfiguration=logging_configuration,
828
    )
829
    state_machine_arn = creation_response["stateMachineArn"]
1✔
830
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
831
    sfn_snapshot.match("creation_response", creation_response)
1✔
832

833
    execution_arn = launch_and_record_express_async_execution(
1✔
834
        target_aws_client,
835
        sfn_snapshot,
836
        state_machine_arn,
837
        log_group_name,
838
        execution_input,
839
    )
840
    return state_machine_arn, execution_arn
1✔
841

842

843
def create_and_record_events(
1✔
844
    create_state_machine_iam_role,
845
    create_state_machine,
846
    sfn_events_to_sqs_queue,
847
    target_aws_client,
848
    sfn_snapshot,
849
    definition,
850
    execution_input,
851
):
852
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
853
    sfn_snapshot.add_transformers_list(
1✔
854
        [
855
            JsonpathTransformer(
856
                jsonpath="$..detail.startDate",
857
                replacement="start-date",
858
                replace_reference=False,
859
            ),
860
            JsonpathTransformer(
861
                jsonpath="$..detail.stopDate",
862
                replacement="stop-date",
863
                replace_reference=False,
864
            ),
865
            JsonpathTransformer(
866
                jsonpath="$..detail.name",
867
                replacement="test_event_bridge_events-{short_uid()}",
868
                replace_reference=False,
869
            ),
870
        ]
871
    )
872

873
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
874
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
875
        target_aws_client,
876
        name=f"test_event_bridge_events-{short_uid()}",
877
        definition=definition,
878
        roleArn=snf_role_arn,
879
    )
880
    state_machine_arn = create_output["stateMachineArn"]
1✔
881

882
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
883

884
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
885
        stateMachineArn=state_machine_arn, input=execution_input
886
    )
887
    execution_arn = start_execution["executionArn"]
1✔
888
    await_execution_terminated(
1✔
889
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
890
    )
891

892
    stepfunctions_events = []
1✔
893

894
    def _get_events():
1✔
895
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
896
        for message in received.get("Messages", []):
1✔
897
            body = json.loads(message["Body"])
1✔
898
            stepfunctions_events.append(body)
1✔
899
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
900
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
901

902
    poll_condition(_get_events, timeout=60)
1✔
903

904
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
905

906

907
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
908
    stepfunctions_events = []
1✔
909

910
    def _get_events():
1✔
911
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
912
        for message in received.get("Messages", []):
1✔
913
            body = json.loads(message["Body"])
1✔
914
            stepfunctions_events.append(body)
1✔
915
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
916
        return len(stepfunctions_events) == num_events
1✔
917

918
    poll_condition(_get_events, timeout=60)
1✔
919
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", {})))
1✔
920
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
921

922

923
class SfnNoneRecursiveParallelTransformer:
1✔
924
    """
925
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
926
    """
927

928
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
929
        self.events_jsonpath: str = events_jsonpath
1✔
930

931
    @staticmethod
1✔
932
    def _normalise_events(events: list[dict]) -> None:
1✔
933
        start_idx = None
1✔
934
        sublist = []
1✔
935
        in_sublist = False
1✔
936
        for i, event in enumerate(events):
1✔
937
            event_type = event.get("type")
1✔
938
            if event_type is None:
1✔
939
                LOG.debug(
×
940
                    "No 'type' in event item '%s'.",
941
                    event,
942
                )
943
                in_sublist = False
×
944

945
            elif event_type in {
1✔
946
                None,
947
                HistoryEventType.ParallelStateSucceeded,
948
                HistoryEventType.ParallelStateAborted,
949
                HistoryEventType.ParallelStateExited,
950
                HistoryEventType.ParallelStateFailed,
951
            }:
952
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
953
                in_sublist = False
1✔
954
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
955
                in_sublist = True
1✔
956
                sublist = []
1✔
957
                start_idx = i + 1
1✔
958
            elif in_sublist:
1✔
959
                event["id"] = (0,)
1✔
960
                event["previousEventId"] = 0
1✔
961
                sublist.append(event)
1✔
962

963
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
964
        pattern = parse("$..events")
1✔
965
        events = pattern.find(input_data)
1✔
966
        if not events:
1✔
967
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
968
            return input_data
×
969

970
        for events_data in events:
1✔
971
            self._normalise_events(events_data.value)
1✔
972

973
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc