• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 972647ca-f36e-4dab-befa-d0de828061c5

17 Apr 2025 08:11PM UTC coverage: 86.255% (-0.02%) from 86.279%
972647ca-f36e-4dab-befa-d0de828061c5

push

circleci

web-flow
Step Functions: Surface Support for Mocked Responses (#12525)

200 of 245 new or added lines in 9 files covered. (81.63%)

90 existing lines in 16 files now uncovered.

63871 of 74049 relevant lines covered (86.26%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.77
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from typing import Callable, Final, Optional
1✔
4

5
from botocore.exceptions import ClientError
1✔
6
from jsonpath_ng.ext import parse
1✔
7
from localstack_snapshot.snapshots.transformer import (
1✔
8
    JsonpathTransformer,
9
    RegexTransformer,
10
    TransformContext,
11
)
12

13
from localstack.aws.api.stepfunctions import (
1✔
14
    Arn,
15
    CloudWatchLogsLogGroup,
16
    CreateStateMachineOutput,
17
    Definition,
18
    ExecutionStatus,
19
    HistoryEventList,
20
    HistoryEventType,
21
    LogDestination,
22
    LoggingConfiguration,
23
    LogLevel,
24
    LongArn,
25
    StateMachineType,
26
)
27
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
28
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
29
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
30
from localstack.utils.strings import short_uid
1✔
31
from localstack.utils.sync import poll_condition
1✔
32

33
LOG = logging.getLogger(__name__)
1✔
34

35

36
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
37
# Running executions may emit logs after DeleteStateMachine API is called.
38
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
39

40

41
def await_no_state_machines_listed(stepfunctions_client):
1✔
42
    def _is_empty_state_machine_list():
×
43
        lst_resp = stepfunctions_client.list_state_machines()
×
44
        state_machines = lst_resp["stateMachines"]
×
45
        return not bool(state_machines)
×
46

47
    success = poll_condition(
×
48
        condition=_is_empty_state_machine_list,
49
        timeout=_DELETION_TIMEOUT_SECS,
50
        interval=1,
51
    )
52
    if not success:
×
53
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
54

55

56
def _is_state_machine_alias_listed(
1✔
57
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
58
):
59
    list_state_machine_aliases_list = stepfunctions_client.list_state_machine_aliases(
1✔
60
        stateMachineArn=state_machine_arn
61
    )
62
    state_machine_aliases = list_state_machine_aliases_list["stateMachineAliases"]
1✔
63
    for state_machine_alias in state_machine_aliases:
1✔
64
        if state_machine_alias["stateMachineAliasArn"] == state_machine_alias_arn:
1✔
65
            return True
1✔
66
    return False
1✔
67

68

69
def await_state_machine_alias_is_created(
1✔
70
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
71
):
72
    success = poll_condition(
1✔
73
        condition=lambda: _is_state_machine_alias_listed(
74
            stepfunctions_client=stepfunctions_client,
75
            state_machine_arn=state_machine_arn,
76
            state_machine_alias_arn=state_machine_alias_arn,
77
        ),
78
        timeout=_DELETION_TIMEOUT_SECS,
79
        interval=1,
80
    )
81
    if not success:
1✔
82
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
83

84

85
def await_state_machine_alias_is_deleted(
1✔
86
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
87
):
88
    success = poll_condition(
1✔
89
        condition=lambda: not _is_state_machine_alias_listed(
90
            stepfunctions_client=stepfunctions_client,
91
            state_machine_arn=state_machine_arn,
92
            state_machine_alias_arn=state_machine_alias_arn,
93
        ),
94
        timeout=_DELETION_TIMEOUT_SECS,
95
        interval=1,
96
    )
97
    if not success:
1✔
98
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
99

100

101
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
102
    lst_resp = stepfunctions_client.list_state_machines()
1✔
103
    state_machines = lst_resp["stateMachines"]
1✔
104
    for state_machine in state_machines:
1✔
105
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
106
            return True
1✔
107
    return False
1✔
108

109

110
def _is_state_machine_version_listed(
1✔
111
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
112
) -> bool:
113
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
114
    versions = lst_resp["stateMachineVersions"]
1✔
115
    for version in versions:
1✔
116
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
117
            return True
1✔
118
    return False
1✔
119

120

121
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
122
    success = poll_condition(
1✔
123
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
124
        timeout=_DELETION_TIMEOUT_SECS,
125
        interval=1,
126
    )
127
    if not success:
1✔
128
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
129

130

131
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
132
    success = poll_condition(
1✔
133
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
134
        timeout=_DELETION_TIMEOUT_SECS,
135
        interval=1,
136
    )
137
    if not success:
1✔
138
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
139

140

141
def await_state_machine_version_not_listed(
1✔
142
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
143
):
144
    success = poll_condition(
1✔
145
        condition=lambda: not _is_state_machine_version_listed(
146
            stepfunctions_client, state_machine_arn, state_machine_version_arn
147
        ),
148
        timeout=_DELETION_TIMEOUT_SECS,
149
        interval=1,
150
    )
151
    if not success:
1✔
152
        LOG.warning(
×
153
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
154
            state_machine_arn,
155
            state_machine_version_arn,
156
        )
157

158

159
def await_state_machine_version_listed(
1✔
160
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
161
):
162
    success = poll_condition(
1✔
163
        condition=lambda: _is_state_machine_version_listed(
164
            stepfunctions_client, state_machine_arn, state_machine_version_arn
165
        ),
166
        timeout=_DELETION_TIMEOUT_SECS,
167
        interval=1,
168
    )
169
    if not success:
1✔
170
        LOG.warning(
×
171
            "Timed out whilst awaiting for version of %s to include '%s'.",
172
            state_machine_arn,
173
            state_machine_version_arn,
174
        )
175

176

177
def await_on_execution_events(
1✔
178
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
179
) -> HistoryEventList:
180
    events: HistoryEventList = list()
1✔
181

182
    def _run_check():
1✔
183
        nonlocal events
184
        events.clear()
1✔
185
        try:
1✔
186
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
187
        except ClientError:
×
188
            return False
×
189
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
190
        res: bool = check_func(events)
1✔
191
        return res
1✔
192

193
    assert poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
194
    return events
1✔
195

196

197
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
198
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
199
        if len(events) > 0:
1✔
200
            last_event = events[-1]
1✔
201
            return "executionSucceededEventDetails" in last_event
1✔
202
        return False
1✔
203

204
    return await_on_execution_events(
1✔
205
        stepfunctions_client=stepfunctions_client,
206
        execution_arn=execution_arn,
207
        check_func=_check_last_is_success,
208
    )
209

210

211
def await_list_execution_status(
1✔
212
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
213
):
214
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
215

216
    def _run_check():
1✔
217
        list_resp = stepfunctions_client.list_executions(
1✔
218
            stateMachineArn=state_machine_arn, statusFilter=status
219
        )
220
        for execution in list_resp.get("executions", []):
1✔
221
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
222
                continue
×
223
            return True
1✔
224
        return False
1✔
225

226
    success = poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
227
    if not success:
1✔
228
        LOG.warning(
×
229
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
230
            status,
231
            execution_arn,
232
        )
233

234

235
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
236
    if len(events) > 0:
1✔
237
        last_event = events[-1]
1✔
238
        last_event_type = last_event.get("type")
1✔
239
        return last_event_type is None or last_event_type in {
1✔
240
            HistoryEventType.ExecutionFailed,
241
            HistoryEventType.ExecutionAborted,
242
            HistoryEventType.ExecutionTimedOut,
243
            HistoryEventType.ExecutionSucceeded,
244
        }
245
    return False
1✔
246

247

248
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
249
    return await_on_execution_events(
1✔
250
        stepfunctions_client=stepfunctions_client,
251
        execution_arn=execution_arn,
252
        check_func=_is_last_history_event_terminal,
253
    )
254

255

256
def await_execution_lists_terminated(
1✔
257
    stepfunctions_client, state_machine_arn: str, execution_arn: str
258
):
259
    def _check_last_is_terminal() -> bool:
1✔
260
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
261
        executions = list_output["executions"]
1✔
262
        for execution in executions:
1✔
263
            if execution["executionArn"] == execution_arn:
1✔
264
                return execution["status"] != ExecutionStatus.RUNNING
1✔
265
        return False
×
266

267
    success = poll_condition(condition=_check_last_is_terminal, timeout=120, interval=1)
1✔
268
    if not success:
1✔
269
        LOG.warning(
×
270
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
271
            execution_arn,
272
        )
273

274

275
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
276
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
277
        for event in events:
1✔
278
            return "executionStartedEventDetails" in event
1✔
279
        return False
×
280

281
    return await_on_execution_events(
1✔
282
        stepfunctions_client=stepfunctions_client,
283
        execution_arn=execution_arn,
284
        check_func=_check_stated_exists,
285
    )
286

287

288
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
289
    def _run_check():
1✔
290
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
291
        status: ExecutionStatus = desc_res["status"]
1✔
292
        return status == ExecutionStatus.ABORTED
1✔
293

294
    success = poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
295
    if not success:
1✔
296
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
297

298

299
def get_expected_execution_logs(
1✔
300
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
301
) -> HistoryEventList:
302
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
303
    execution_history_events = execution_history["events"]
1✔
304
    expected_events = [
1✔
305
        event
306
        for event in execution_history_events
307
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
308
    ]
309
    return expected_events
1✔
310

311

312
def is_execution_logs_list_complete(
1✔
313
    expected_events: HistoryEventList,
314
) -> Callable[[HistoryEventList], bool]:
315
    def _validation_function(log_events: list) -> bool:
1✔
316
        if not expected_events:
1✔
317
            return True
×
318
        return len(expected_events) == len(log_events)
1✔
319

320
    return _validation_function
1✔
321

322

323
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
324
    logs_client = target_aws_client.logs
1✔
325
    log_stream_name = str()
1✔
326

327
    def _run_check():
1✔
328
        nonlocal log_stream_name
329
        try:
1✔
330
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
331
                "logStreams"
332
            ]
333
            if not log_streams:
1✔
334
                return False
×
335

336
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
337
            if (
1✔
338
                log_stream_name
339
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
340
            ):
341
                # SFN has not yet create the log stream for the execution, only the validation steam.
342
                return False
1✔
343
            return True
1✔
344
        except ClientError:
×
345
            return False
×
346

347
    assert poll_condition(condition=_run_check)
1✔
348
    return log_stream_name
1✔
349

350

351
def await_on_execution_logs(
1✔
352
    target_aws_client,
353
    log_group_name: str,
354
    validation_function: Callable[[HistoryEventList], bool] = None,
355
) -> HistoryEventList:
356
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
357

358
    logs_client = target_aws_client.logs
1✔
359
    events: HistoryEventList = list()
1✔
360

361
    def _run_check():
1✔
362
        nonlocal events
363
        events.clear()
1✔
364
        try:
1✔
365
            log_events = logs_client.get_log_events(
1✔
366
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
367
            )["events"]
368
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
369
        except ClientError:
×
370
            return False
×
371

372
        res = validation_function(events)
1✔
373
        return res
1✔
374

375
    assert poll_condition(condition=_run_check)
1✔
376
    return events
1✔
377

378

379
def create_state_machine_with_iam_role(
1✔
380
    target_aws_client,
381
    create_state_machine_iam_role,
382
    create_state_machine,
383
    snapshot,
384
    definition: Definition,
385
    logging_configuration: Optional[LoggingConfiguration] = None,
386
    state_machine_name: Optional[str] = None,
387
):
388
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
389
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
390
    snapshot.add_transformer(
1✔
391
        RegexTransformer(
392
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
393
            "Extended Request ID: <extended_request_id>",
394
        )
395
    )
396
    snapshot.add_transformer(
1✔
397
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
398
    )
399

400
    sm_name: str = state_machine_name or f"statemachine_create_and_record_execution_{short_uid()}"
1✔
401
    create_arguments = {
1✔
402
        "name": sm_name,
403
        "definition": definition,
404
        "roleArn": snf_role_arn,
405
    }
406
    if logging_configuration is not None:
1✔
407
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
408
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
409
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
410
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
411
    return state_machine_arn
1✔
412

413

414
def launch_and_record_execution(
1✔
415
    target_aws_client,
416
    sfn_snapshot,
417
    state_machine_arn,
418
    execution_input,
419
    verify_execution_description=False,
420
) -> LongArn:
421
    stepfunctions_client = target_aws_client.stepfunctions
1✔
422
    exec_resp = stepfunctions_client.start_execution(
1✔
423
        stateMachineArn=state_machine_arn, input=execution_input
424
    )
425
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
426
    execution_arn = exec_resp["executionArn"]
1✔
427

428
    await_execution_terminated(
1✔
429
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
430
    )
431

432
    if verify_execution_description:
1✔
433
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
434
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
435

436
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
437

438
    # Transform all map runs if any.
439
    try:
1✔
440
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
441
        if isinstance(map_run_arns, str):
1✔
442
            map_run_arns = [map_run_arns]
1✔
443
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
444
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
445
    except NoSuchJsonPathError:
1✔
446
        # No mapRunArns
447
        pass
1✔
448

449
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
450

451
    return execution_arn
1✔
452

453

454
def launch_and_record_mocked_execution(
1✔
455
    target_aws_client,
456
    sfn_snapshot,
457
    state_machine_arn,
458
    execution_input,
459
    test_name,
460
) -> LongArn:
461
    stepfunctions_client = target_aws_client.stepfunctions
1✔
462
    exec_resp = stepfunctions_client.start_execution(
1✔
463
        stateMachineArn=f"{state_machine_arn}#{test_name}", input=execution_input
464
    )
465
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
466
    execution_arn = exec_resp["executionArn"]
1✔
467

468
    await_execution_terminated(
1✔
469
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
470
    )
471

472
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
473

474
    # Transform all map runs if any.
475
    try:
1✔
476
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
NEW
477
        if isinstance(map_run_arns, str):
×
NEW
478
            map_run_arns = [map_run_arns]
×
NEW
479
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
×
NEW
480
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
×
481
    except NoSuchJsonPathError:
1✔
482
        # No mapRunArns
483
        pass
1✔
484

485
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
486

487
    return execution_arn
1✔
488

489

490
def launch_and_record_logs(
1✔
491
    target_aws_client,
492
    state_machine_arn,
493
    execution_input,
494
    log_level,
495
    log_group_name,
496
    sfn_snapshot,
497
):
498
    execution_arn = launch_and_record_execution(
1✔
499
        target_aws_client,
500
        sfn_snapshot,
501
        state_machine_arn,
502
        execution_input,
503
    )
504
    expected_events = get_expected_execution_logs(
1✔
505
        target_aws_client.stepfunctions, log_level, execution_arn
506
    )
507

508
    if log_level == LogLevel.OFF or not expected_events:
1✔
509
        # The test should terminate here, as no log streams for this execution would have been created.
510
        return
1✔
511

512
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
513
    logged_execution_events = await_on_execution_logs(
1✔
514
        target_aws_client, log_group_name, logs_validation_function
515
    )
516

517
    sfn_snapshot.add_transformer(
1✔
518
        JsonpathTransformer(
519
            jsonpath="$..event_timestamp",
520
            replacement="timestamp",
521
            replace_reference=False,
522
        )
523
    )
524
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
525

526

527
# TODO: make this return the execution ARN for manual assertions
528
def create_and_record_execution(
1✔
529
    target_aws_client,
530
    create_state_machine_iam_role,
531
    create_state_machine,
532
    sfn_snapshot,
533
    definition,
534
    execution_input,
535
    verify_execution_description=False,
536
):
537
    state_machine_arn = create_state_machine_with_iam_role(
1✔
538
        target_aws_client,
539
        create_state_machine_iam_role,
540
        create_state_machine,
541
        sfn_snapshot,
542
        definition,
543
    )
544
    launch_and_record_execution(
1✔
545
        target_aws_client,
546
        sfn_snapshot,
547
        state_machine_arn,
548
        execution_input,
549
        verify_execution_description,
550
    )
551

552

553
def create_and_record_mocked_execution(
1✔
554
    target_aws_client,
555
    create_state_machine_iam_role,
556
    create_state_machine,
557
    sfn_snapshot,
558
    definition,
559
    execution_input,
560
    state_machine_name,
561
    test_name,
562
):
563
    state_machine_arn = create_state_machine_with_iam_role(
1✔
564
        target_aws_client,
565
        create_state_machine_iam_role,
566
        create_state_machine,
567
        sfn_snapshot,
568
        definition,
569
        state_machine_name=state_machine_name,
570
    )
571
    launch_and_record_mocked_execution(
1✔
572
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
573
    )
574

575

576
def create_and_record_logs(
1✔
577
    target_aws_client,
578
    create_state_machine_iam_role,
579
    create_state_machine,
580
    sfn_create_log_group,
581
    sfn_snapshot,
582
    definition,
583
    execution_input,
584
    log_level: LogLevel,
585
    include_execution_data: bool,
586
):
587
    state_machine_arn = create_state_machine_with_iam_role(
1✔
588
        target_aws_client,
589
        create_state_machine_iam_role,
590
        create_state_machine,
591
        sfn_snapshot,
592
        definition,
593
    )
594

595
    log_group_name = sfn_create_log_group()
1✔
596
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
597
        "logGroups"
598
    ][0]["arn"]
599
    logging_configuration = LoggingConfiguration(
1✔
600
        level=log_level,
601
        includeExecutionData=include_execution_data,
602
        destinations=[
603
            LogDestination(
604
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
605
            ),
606
        ],
607
    )
608
    target_aws_client.stepfunctions.update_state_machine(
1✔
609
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
610
    )
611

612
    launch_and_record_logs(
1✔
613
        target_aws_client,
614
        state_machine_arn,
615
        execution_input,
616
        log_level,
617
        log_group_name,
618
        sfn_snapshot,
619
    )
620

621

622
def launch_and_record_sync_execution(
1✔
623
    target_aws_client,
624
    sfn_snapshot,
625
    state_machine_arn,
626
    execution_input,
627
):
628
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
629
        stateMachineArn=state_machine_arn,
630
        input=execution_input,
631
    )
632
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
633
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
634

635

636
def create_and_record_express_sync_execution(
1✔
637
    target_aws_client,
638
    create_state_machine_iam_role,
639
    create_state_machine,
640
    sfn_snapshot,
641
    definition,
642
    execution_input,
643
):
644
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
645
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
646

647
    creation_response = create_state_machine(
1✔
648
        target_aws_client,
649
        name=f"express_statemachine_{short_uid()}",
650
        definition=definition,
651
        roleArn=snf_role_arn,
652
        type=StateMachineType.EXPRESS,
653
    )
654
    state_machine_arn = creation_response["stateMachineArn"]
1✔
655
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
656
    sfn_snapshot.match("creation_response", creation_response)
1✔
657

658
    launch_and_record_sync_execution(
1✔
659
        target_aws_client,
660
        sfn_snapshot,
661
        state_machine_arn,
662
        execution_input,
663
    )
664

665

666
def launch_and_record_express_async_execution(
1✔
667
    target_aws_client,
668
    sfn_snapshot,
669
    state_machine_arn,
670
    log_group_name,
671
    execution_input,
672
):
673
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
674
        stateMachineArn=state_machine_arn, input=execution_input
675
    )
676
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
677
    execution_arn = start_execution["executionArn"]
1✔
678

679
    event_list = await_on_execution_logs(
1✔
680
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
681
    )
682
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
683
    end_event = event_list[-1]
1✔
684
    sfn_snapshot.match("end_event", end_event)
1✔
685

686
    return execution_arn
1✔
687

688

689
def create_and_record_express_async_execution(
1✔
690
    target_aws_client,
691
    create_state_machine_iam_role,
692
    create_state_machine,
693
    sfn_create_log_group,
694
    sfn_snapshot,
695
    definition,
696
    execution_input,
697
    include_execution_data: bool = True,
698
) -> tuple[LongArn, LongArn]:
699
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
700
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
701

702
    log_group_name = sfn_create_log_group()
1✔
703
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
704
        "logGroups"
705
    ][0]["arn"]
706
    logging_configuration = LoggingConfiguration(
1✔
707
        level=LogLevel.ALL,
708
        includeExecutionData=include_execution_data,
709
        destinations=[
710
            LogDestination(
711
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
712
            ),
713
        ],
714
    )
715

716
    creation_response = create_state_machine(
1✔
717
        target_aws_client,
718
        name=f"express_statemachine_{short_uid()}",
719
        definition=definition,
720
        roleArn=snf_role_arn,
721
        type=StateMachineType.EXPRESS,
722
        loggingConfiguration=logging_configuration,
723
    )
724
    state_machine_arn = creation_response["stateMachineArn"]
1✔
725
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
726
    sfn_snapshot.match("creation_response", creation_response)
1✔
727

728
    execution_arn = launch_and_record_express_async_execution(
1✔
729
        target_aws_client,
730
        sfn_snapshot,
731
        state_machine_arn,
732
        log_group_name,
733
        execution_input,
734
    )
735
    return state_machine_arn, execution_arn
1✔
736

737

738
def create_and_record_events(
1✔
739
    create_state_machine_iam_role,
740
    create_state_machine,
741
    sfn_events_to_sqs_queue,
742
    target_aws_client,
743
    sfn_snapshot,
744
    definition,
745
    execution_input,
746
):
747
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
748
    sfn_snapshot.add_transformers_list(
1✔
749
        [
750
            JsonpathTransformer(
751
                jsonpath="$..detail.startDate",
752
                replacement="start-date",
753
                replace_reference=False,
754
            ),
755
            JsonpathTransformer(
756
                jsonpath="$..detail.stopDate",
757
                replacement="stop-date",
758
                replace_reference=False,
759
            ),
760
            JsonpathTransformer(
761
                jsonpath="$..detail.name",
762
                replacement="test_event_bridge_events-{short_uid()}",
763
                replace_reference=False,
764
            ),
765
        ]
766
    )
767

768
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
769
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
770
        target_aws_client,
771
        name=f"test_event_bridge_events-{short_uid()}",
772
        definition=definition,
773
        roleArn=snf_role_arn,
774
    )
775
    state_machine_arn = create_output["stateMachineArn"]
1✔
776

777
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
778

779
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
780
        stateMachineArn=state_machine_arn, input=execution_input
781
    )
782
    execution_arn = start_execution["executionArn"]
1✔
783
    await_execution_terminated(
1✔
784
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
785
    )
786

787
    stepfunctions_events = list()
1✔
788

789
    def _get_events():
1✔
790
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
791
        for message in received.get("Messages", []):
1✔
792
            body = json.loads(message["Body"])
1✔
793
            stepfunctions_events.append(body)
1✔
794
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
795
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
796

797
    poll_condition(_get_events, timeout=60)
1✔
798

799
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
800

801

802
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
803
    stepfunctions_events = list()
1✔
804

805
    def _get_events():
1✔
806
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
807
        for message in received.get("Messages", []):
1✔
808
            body = json.loads(message["Body"])
1✔
809
            stepfunctions_events.append(body)
1✔
810
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
811
        return len(stepfunctions_events) == num_events
1✔
812

813
    poll_condition(_get_events, timeout=60)
1✔
814
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", dict())))
1✔
815
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
816

817

818
class SfnNoneRecursiveParallelTransformer:
1✔
819
    """
820
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
821
    """
822

823
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
824
        self.events_jsonpath: str = events_jsonpath
1✔
825

826
    @staticmethod
1✔
827
    def _normalise_events(events: list[dict]) -> None:
1✔
828
        start_idx = None
1✔
829
        sublist = list()
1✔
830
        in_sublist = False
1✔
831
        for i, event in enumerate(events):
1✔
832
            event_type = event.get("type")
1✔
833
            if event_type is None:
1✔
UNCOV
834
                LOG.debug(
×
835
                    "No 'type' in event item '%s'.",
836
                    event,
837
                )
UNCOV
838
                in_sublist = False
×
839

840
            elif event_type in {
1✔
841
                None,
842
                HistoryEventType.ParallelStateSucceeded,
843
                HistoryEventType.ParallelStateAborted,
844
                HistoryEventType.ParallelStateExited,
845
                HistoryEventType.ParallelStateFailed,
846
            }:
847
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
848
                in_sublist = False
1✔
849
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
850
                in_sublist = True
1✔
851
                sublist = []
1✔
852
                start_idx = i + 1
1✔
853
            elif in_sublist:
1✔
854
                event["id"] = (0,)
1✔
855
                event["previousEventId"] = 0
1✔
856
                sublist.append(event)
1✔
857

858
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
859
        pattern = parse("$..events")
1✔
860
        events = pattern.find(input_data)
1✔
861
        if not events:
1✔
UNCOV
862
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
UNCOV
863
            return input_data
×
864

865
        for events_data in events:
1✔
866
            self._normalise_events(events_data.value)
1✔
867

868
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc