• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 4c016940-14bb-4fba-a49d-427f621d8d2d

10 Mar 2025 11:34PM UTC coverage: 86.901% (-0.03%) from 86.929%
4c016940-14bb-4fba-a49d-427f621d8d2d

push

circleci

web-flow
Update CODEOWNERS (#12359)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>

62118 of 71481 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.37
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from typing import Callable, Final, Optional
1✔
4

5
from botocore.exceptions import ClientError
1✔
6
from jsonpath_ng.ext import parse
1✔
7
from localstack_snapshot.snapshots.transformer import (
1✔
8
    JsonpathTransformer,
9
    RegexTransformer,
10
    TransformContext,
11
)
12

13
from localstack.aws.api.stepfunctions import (
1✔
14
    Arn,
15
    CloudWatchLogsLogGroup,
16
    CreateStateMachineOutput,
17
    Definition,
18
    ExecutionStatus,
19
    HistoryEventList,
20
    HistoryEventType,
21
    LogDestination,
22
    LoggingConfiguration,
23
    LogLevel,
24
    LongArn,
25
    StateMachineType,
26
)
27
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
28
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
29
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
30
from localstack.utils.strings import short_uid
1✔
31
from localstack.utils.sync import poll_condition
1✔
32

33
LOG = logging.getLogger(__name__)
1✔
34

35

36
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
37
# Running executions may emit logs after DeleteStateMachine API is called.
38
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
39

40

41
def await_no_state_machines_listed(stepfunctions_client):
1✔
42
    def _is_empty_state_machine_list():
×
43
        lst_resp = stepfunctions_client.list_state_machines()
×
44
        state_machines = lst_resp["stateMachines"]
×
45
        return not bool(state_machines)
×
46

47
    success = poll_condition(
×
48
        condition=_is_empty_state_machine_list,
49
        timeout=_DELETION_TIMEOUT_SECS,
50
        interval=1,
51
    )
52
    if not success:
×
53
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
54

55

56
def _is_state_machine_alias_listed(
1✔
57
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
58
):
59
    list_state_machine_aliases_list = stepfunctions_client.list_state_machine_aliases(
1✔
60
        stateMachineArn=state_machine_arn
61
    )
62
    state_machine_aliases = list_state_machine_aliases_list["stateMachineAliases"]
1✔
63
    for state_machine_alias in state_machine_aliases:
1✔
64
        if state_machine_alias["stateMachineAliasArn"] == state_machine_alias_arn:
1✔
65
            return True
1✔
66
    return False
1✔
67

68

69
def await_state_machine_alias_is_created(
1✔
70
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
71
):
72
    success = poll_condition(
1✔
73
        condition=lambda: _is_state_machine_alias_listed(
74
            stepfunctions_client=stepfunctions_client,
75
            state_machine_arn=state_machine_arn,
76
            state_machine_alias_arn=state_machine_alias_arn,
77
        ),
78
        timeout=_DELETION_TIMEOUT_SECS,
79
        interval=1,
80
    )
81
    if not success:
1✔
82
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
83

84

85
def await_state_machine_alias_is_deleted(
1✔
86
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
87
):
88
    success = poll_condition(
1✔
89
        condition=lambda: not _is_state_machine_alias_listed(
90
            stepfunctions_client=stepfunctions_client,
91
            state_machine_arn=state_machine_arn,
92
            state_machine_alias_arn=state_machine_alias_arn,
93
        ),
94
        timeout=_DELETION_TIMEOUT_SECS,
95
        interval=1,
96
    )
97
    if not success:
1✔
98
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
99

100

101
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
102
    lst_resp = stepfunctions_client.list_state_machines()
1✔
103
    state_machines = lst_resp["stateMachines"]
1✔
104
    for state_machine in state_machines:
1✔
105
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
106
            return True
1✔
107
    return False
1✔
108

109

110
def _is_state_machine_version_listed(
1✔
111
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
112
) -> bool:
113
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
114
    versions = lst_resp["stateMachineVersions"]
1✔
115
    for version in versions:
1✔
116
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
117
            return True
1✔
118
    return False
1✔
119

120

121
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
122
    success = poll_condition(
1✔
123
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
124
        timeout=_DELETION_TIMEOUT_SECS,
125
        interval=1,
126
    )
127
    if not success:
1✔
128
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
129

130

131
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
132
    success = poll_condition(
1✔
133
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
134
        timeout=_DELETION_TIMEOUT_SECS,
135
        interval=1,
136
    )
137
    if not success:
1✔
138
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
139

140

141
def await_state_machine_version_not_listed(
1✔
142
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
143
):
144
    success = poll_condition(
1✔
145
        condition=lambda: not _is_state_machine_version_listed(
146
            stepfunctions_client, state_machine_arn, state_machine_version_arn
147
        ),
148
        timeout=_DELETION_TIMEOUT_SECS,
149
        interval=1,
150
    )
151
    if not success:
1✔
152
        LOG.warning(
×
153
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
154
            state_machine_arn,
155
            state_machine_version_arn,
156
        )
157

158

159
def await_state_machine_version_listed(
1✔
160
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
161
):
162
    success = poll_condition(
1✔
163
        condition=lambda: _is_state_machine_version_listed(
164
            stepfunctions_client, state_machine_arn, state_machine_version_arn
165
        ),
166
        timeout=_DELETION_TIMEOUT_SECS,
167
        interval=1,
168
    )
169
    if not success:
1✔
170
        LOG.warning(
×
171
            "Timed out whilst awaiting for version of %s to include '%s'.",
172
            state_machine_arn,
173
            state_machine_version_arn,
174
        )
175

176

177
def await_on_execution_events(
1✔
178
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
179
) -> HistoryEventList:
180
    events: HistoryEventList = list()
1✔
181

182
    def _run_check():
1✔
183
        nonlocal events
184
        events.clear()
1✔
185
        try:
1✔
186
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
187
        except ClientError:
×
188
            return False
×
189
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
190
        res: bool = check_func(events)
1✔
191
        return res
1✔
192

193
    assert poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
194
    return events
1✔
195

196

197
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
198
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
199
        if len(events) > 0:
1✔
200
            last_event = events[-1]
1✔
201
            return "executionSucceededEventDetails" in last_event
1✔
202
        return False
1✔
203

204
    return await_on_execution_events(
1✔
205
        stepfunctions_client=stepfunctions_client,
206
        execution_arn=execution_arn,
207
        check_func=_check_last_is_success,
208
    )
209

210

211
def await_list_execution_status(
1✔
212
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
213
):
214
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
215

216
    def _run_check():
1✔
217
        list_resp = stepfunctions_client.list_executions(
1✔
218
            stateMachineArn=state_machine_arn, statusFilter=status
219
        )
220
        for execution in list_resp.get("executions", []):
1✔
221
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
222
                continue
×
223
            return True
1✔
224
        return False
1✔
225

226
    success = poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
227
    if not success:
1✔
228
        LOG.warning(
×
229
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
230
            status,
231
            execution_arn,
232
        )
233

234

235
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
236
    if len(events) > 0:
1✔
237
        last_event = events[-1]
1✔
238
        last_event_type = last_event.get("type")
1✔
239
        return last_event_type is None or last_event_type in {
1✔
240
            HistoryEventType.ExecutionFailed,
241
            HistoryEventType.ExecutionAborted,
242
            HistoryEventType.ExecutionTimedOut,
243
            HistoryEventType.ExecutionSucceeded,
244
        }
245
    return False
1✔
246

247

248
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
249
    return await_on_execution_events(
1✔
250
        stepfunctions_client=stepfunctions_client,
251
        execution_arn=execution_arn,
252
        check_func=_is_last_history_event_terminal,
253
    )
254

255

256
def await_execution_lists_terminated(
1✔
257
    stepfunctions_client, state_machine_arn: str, execution_arn: str
258
):
259
    def _check_last_is_terminal() -> bool:
1✔
260
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
261
        executions = list_output["executions"]
1✔
262
        for execution in executions:
1✔
263
            if execution["executionArn"] == execution_arn:
1✔
264
                return execution["status"] != ExecutionStatus.RUNNING
1✔
265
        return False
×
266

267
    success = poll_condition(condition=_check_last_is_terminal, timeout=120, interval=1)
1✔
268
    if not success:
1✔
269
        LOG.warning(
×
270
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
271
            execution_arn,
272
        )
273

274

275
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
276
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
277
        for event in events:
1✔
278
            return "executionStartedEventDetails" in event
1✔
279
        return False
×
280

281
    return await_on_execution_events(
1✔
282
        stepfunctions_client=stepfunctions_client,
283
        execution_arn=execution_arn,
284
        check_func=_check_stated_exists,
285
    )
286

287

288
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
289
    def _run_check():
1✔
290
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
291
        status: ExecutionStatus = desc_res["status"]
1✔
292
        return status == ExecutionStatus.ABORTED
1✔
293

294
    success = poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
295
    if not success:
1✔
296
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
297

298

299
def get_expected_execution_logs(
1✔
300
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
301
) -> HistoryEventList:
302
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
303
    execution_history_events = execution_history["events"]
1✔
304
    expected_events = [
1✔
305
        event
306
        for event in execution_history_events
307
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
308
    ]
309
    return expected_events
1✔
310

311

312
def is_execution_logs_list_complete(
1✔
313
    expected_events: HistoryEventList,
314
) -> Callable[[HistoryEventList], bool]:
315
    def _validation_function(log_events: list) -> bool:
1✔
316
        if not expected_events:
1✔
317
            return True
×
318
        return len(expected_events) == len(log_events)
1✔
319

320
    return _validation_function
1✔
321

322

323
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
324
    logs_client = target_aws_client.logs
1✔
325
    log_stream_name = str()
1✔
326

327
    def _run_check():
1✔
328
        nonlocal log_stream_name
329
        try:
1✔
330
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
331
                "logStreams"
332
            ]
333
            if not log_streams:
1✔
334
                return False
×
335

336
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
337
            if (
1✔
338
                log_stream_name
339
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
340
            ):
341
                # SFN has not yet create the log stream for the execution, only the validation steam.
342
                return False
1✔
343
            return True
1✔
344
        except ClientError:
×
345
            return False
×
346

347
    assert poll_condition(condition=_run_check)
1✔
348
    return log_stream_name
1✔
349

350

351
def await_on_execution_logs(
1✔
352
    target_aws_client,
353
    log_group_name: str,
354
    validation_function: Callable[[HistoryEventList], bool] = None,
355
) -> HistoryEventList:
356
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
357

358
    logs_client = target_aws_client.logs
1✔
359
    events: HistoryEventList = list()
1✔
360

361
    def _run_check():
1✔
362
        nonlocal events
363
        events.clear()
1✔
364
        try:
1✔
365
            log_events = logs_client.get_log_events(
1✔
366
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
367
            )["events"]
368
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
369
        except ClientError:
×
370
            return False
×
371

372
        res = validation_function(events)
1✔
373
        return res
1✔
374

375
    assert poll_condition(condition=_run_check)
1✔
376
    return events
1✔
377

378

379
def create_state_machine_with_iam_role(
1✔
380
    target_aws_client,
381
    create_state_machine_iam_role,
382
    create_state_machine,
383
    snapshot,
384
    definition: Definition,
385
    logging_configuration: Optional[LoggingConfiguration] = None,
386
):
387
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
388
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
389
    snapshot.add_transformer(
1✔
390
        RegexTransformer(
391
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
392
            "Extended Request ID: <extended_request_id>",
393
        )
394
    )
395
    snapshot.add_transformer(
1✔
396
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
397
    )
398

399
    sm_name: str = f"statemachine_create_and_record_execution_{short_uid()}"
1✔
400
    create_arguments = {
1✔
401
        "name": sm_name,
402
        "definition": definition,
403
        "roleArn": snf_role_arn,
404
    }
405
    if logging_configuration is not None:
1✔
406
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
407
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
408
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
409
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
410
    return state_machine_arn
1✔
411

412

413
def launch_and_record_execution(
1✔
414
    target_aws_client,
415
    sfn_snapshot,
416
    state_machine_arn,
417
    execution_input,
418
    verify_execution_description=False,
419
) -> LongArn:
420
    stepfunctions_client = target_aws_client.stepfunctions
1✔
421
    exec_resp = stepfunctions_client.start_execution(
1✔
422
        stateMachineArn=state_machine_arn, input=execution_input
423
    )
424
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
425
    execution_arn = exec_resp["executionArn"]
1✔
426

427
    await_execution_terminated(
1✔
428
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
429
    )
430

431
    if verify_execution_description:
1✔
432
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
433
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
434

435
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
436

437
    # Transform all map runs if any.
438
    try:
1✔
439
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
440
        if isinstance(map_run_arns, str):
1✔
441
            map_run_arns = [map_run_arns]
1✔
442
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
443
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
444
    except NoSuchJsonPathError:
1✔
445
        # No mapRunArns
446
        pass
1✔
447

448
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
449

450
    return execution_arn
1✔
451

452

453
def launch_and_record_logs(
1✔
454
    target_aws_client,
455
    state_machine_arn,
456
    execution_input,
457
    log_level,
458
    log_group_name,
459
    sfn_snapshot,
460
):
461
    execution_arn = launch_and_record_execution(
1✔
462
        target_aws_client,
463
        sfn_snapshot,
464
        state_machine_arn,
465
        execution_input,
466
    )
467
    expected_events = get_expected_execution_logs(
1✔
468
        target_aws_client.stepfunctions, log_level, execution_arn
469
    )
470

471
    if log_level == LogLevel.OFF or not expected_events:
1✔
472
        # The test should terminate here, as no log streams for this execution would have been created.
473
        return
1✔
474

475
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
476
    logged_execution_events = await_on_execution_logs(
1✔
477
        target_aws_client, log_group_name, logs_validation_function
478
    )
479

480
    sfn_snapshot.add_transformer(
1✔
481
        JsonpathTransformer(
482
            jsonpath="$..event_timestamp",
483
            replacement="timestamp",
484
            replace_reference=False,
485
        )
486
    )
487
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
488

489

490
# TODO: make this return the execution ARN for manual assertions
491
def create_and_record_execution(
1✔
492
    target_aws_client,
493
    create_state_machine_iam_role,
494
    create_state_machine,
495
    sfn_snapshot,
496
    definition,
497
    execution_input,
498
    verify_execution_description=False,
499
):
500
    state_machine_arn = create_state_machine_with_iam_role(
1✔
501
        target_aws_client,
502
        create_state_machine_iam_role,
503
        create_state_machine,
504
        sfn_snapshot,
505
        definition,
506
    )
507
    launch_and_record_execution(
1✔
508
        target_aws_client,
509
        sfn_snapshot,
510
        state_machine_arn,
511
        execution_input,
512
        verify_execution_description,
513
    )
514

515

516
def create_and_record_logs(
1✔
517
    target_aws_client,
518
    create_state_machine_iam_role,
519
    create_state_machine,
520
    sfn_create_log_group,
521
    sfn_snapshot,
522
    definition,
523
    execution_input,
524
    log_level: LogLevel,
525
    include_execution_data: bool,
526
):
527
    state_machine_arn = create_state_machine_with_iam_role(
1✔
528
        target_aws_client,
529
        create_state_machine_iam_role,
530
        create_state_machine,
531
        sfn_snapshot,
532
        definition,
533
    )
534

535
    log_group_name = sfn_create_log_group()
1✔
536
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
537
        "logGroups"
538
    ][0]["arn"]
539
    logging_configuration = LoggingConfiguration(
1✔
540
        level=log_level,
541
        includeExecutionData=include_execution_data,
542
        destinations=[
543
            LogDestination(
544
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
545
            ),
546
        ],
547
    )
548
    target_aws_client.stepfunctions.update_state_machine(
1✔
549
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
550
    )
551

552
    launch_and_record_logs(
1✔
553
        target_aws_client,
554
        state_machine_arn,
555
        execution_input,
556
        log_level,
557
        log_group_name,
558
        sfn_snapshot,
559
    )
560

561

562
def launch_and_record_sync_execution(
1✔
563
    target_aws_client,
564
    sfn_snapshot,
565
    state_machine_arn,
566
    execution_input,
567
):
568
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
569
        stateMachineArn=state_machine_arn,
570
        input=execution_input,
571
    )
572
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
573
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
574

575

576
def create_and_record_express_sync_execution(
1✔
577
    target_aws_client,
578
    create_state_machine_iam_role,
579
    create_state_machine,
580
    sfn_snapshot,
581
    definition,
582
    execution_input,
583
):
584
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
585
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
586

587
    creation_response = create_state_machine(
1✔
588
        target_aws_client,
589
        name=f"express_statemachine_{short_uid()}",
590
        definition=definition,
591
        roleArn=snf_role_arn,
592
        type=StateMachineType.EXPRESS,
593
    )
594
    state_machine_arn = creation_response["stateMachineArn"]
1✔
595
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
596
    sfn_snapshot.match("creation_response", creation_response)
1✔
597

598
    launch_and_record_sync_execution(
1✔
599
        target_aws_client,
600
        sfn_snapshot,
601
        state_machine_arn,
602
        execution_input,
603
    )
604

605

606
def launch_and_record_express_async_execution(
1✔
607
    target_aws_client,
608
    sfn_snapshot,
609
    state_machine_arn,
610
    log_group_name,
611
    execution_input,
612
):
613
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
614
        stateMachineArn=state_machine_arn, input=execution_input
615
    )
616
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
617
    execution_arn = start_execution["executionArn"]
1✔
618

619
    event_list = await_on_execution_logs(
1✔
620
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
621
    )
622
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
623
    end_event = event_list[-1]
1✔
624
    sfn_snapshot.match("end_event", end_event)
1✔
625

626
    return execution_arn
1✔
627

628

629
def create_and_record_express_async_execution(
1✔
630
    target_aws_client,
631
    create_state_machine_iam_role,
632
    create_state_machine,
633
    sfn_create_log_group,
634
    sfn_snapshot,
635
    definition,
636
    execution_input,
637
    include_execution_data: bool = True,
638
) -> tuple[LongArn, LongArn]:
639
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
640
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
641

642
    log_group_name = sfn_create_log_group()
1✔
643
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
644
        "logGroups"
645
    ][0]["arn"]
646
    logging_configuration = LoggingConfiguration(
1✔
647
        level=LogLevel.ALL,
648
        includeExecutionData=include_execution_data,
649
        destinations=[
650
            LogDestination(
651
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
652
            ),
653
        ],
654
    )
655

656
    creation_response = create_state_machine(
1✔
657
        target_aws_client,
658
        name=f"express_statemachine_{short_uid()}",
659
        definition=definition,
660
        roleArn=snf_role_arn,
661
        type=StateMachineType.EXPRESS,
662
        loggingConfiguration=logging_configuration,
663
    )
664
    state_machine_arn = creation_response["stateMachineArn"]
1✔
665
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
666
    sfn_snapshot.match("creation_response", creation_response)
1✔
667

668
    execution_arn = launch_and_record_express_async_execution(
1✔
669
        target_aws_client,
670
        sfn_snapshot,
671
        state_machine_arn,
672
        log_group_name,
673
        execution_input,
674
    )
675
    return state_machine_arn, execution_arn
1✔
676

677

678
def create_and_record_events(
1✔
679
    create_state_machine_iam_role,
680
    create_state_machine,
681
    sfn_events_to_sqs_queue,
682
    target_aws_client,
683
    sfn_snapshot,
684
    definition,
685
    execution_input,
686
):
687
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
688
    sfn_snapshot.add_transformers_list(
1✔
689
        [
690
            JsonpathTransformer(
691
                jsonpath="$..detail.startDate",
692
                replacement="start-date",
693
                replace_reference=False,
694
            ),
695
            JsonpathTransformer(
696
                jsonpath="$..detail.stopDate",
697
                replacement="stop-date",
698
                replace_reference=False,
699
            ),
700
            JsonpathTransformer(
701
                jsonpath="$..detail.name",
702
                replacement="test_event_bridge_events-{short_uid()}",
703
                replace_reference=False,
704
            ),
705
        ]
706
    )
707

708
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
709
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
710
        target_aws_client,
711
        name=f"test_event_bridge_events-{short_uid()}",
712
        definition=definition,
713
        roleArn=snf_role_arn,
714
    )
715
    state_machine_arn = create_output["stateMachineArn"]
1✔
716

717
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
718

719
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
720
        stateMachineArn=state_machine_arn, input=execution_input
721
    )
722
    execution_arn = start_execution["executionArn"]
1✔
723
    await_execution_terminated(
1✔
724
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
725
    )
726

727
    stepfunctions_events = list()
1✔
728

729
    def _get_events():
1✔
730
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
731
        for message in received.get("Messages", []):
1✔
732
            body = json.loads(message["Body"])
1✔
733
            stepfunctions_events.append(body)
1✔
734
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
735
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
736

737
    poll_condition(_get_events, timeout=60)
1✔
738

739
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
740

741

742
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
743
    stepfunctions_events = list()
1✔
744

745
    def _get_events():
1✔
746
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
747
        for message in received.get("Messages", []):
1✔
748
            body = json.loads(message["Body"])
1✔
749
            stepfunctions_events.append(body)
1✔
750
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
751
        return len(stepfunctions_events) == num_events
1✔
752

753
    poll_condition(_get_events, timeout=60)
1✔
754
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", dict())))
1✔
755
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
756

757

758
class SfnNoneRecursiveParallelTransformer:
1✔
759
    """
760
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
761
    """
762

763
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
764
        self.events_jsonpath: str = events_jsonpath
1✔
765

766
    @staticmethod
1✔
767
    def _normalise_events(events: list[dict]) -> None:
1✔
768
        start_idx = None
1✔
769
        sublist = list()
1✔
770
        in_sublist = False
1✔
771
        for i, event in enumerate(events):
1✔
772
            event_type = event.get("type")
1✔
773
            if event_type is None:
1✔
774
                LOG.debug(
×
775
                    "No 'type' in event item '%s'.",
776
                    event,
777
                )
778
                in_sublist = False
×
779

780
            elif event_type in {
1✔
781
                None,
782
                HistoryEventType.ParallelStateSucceeded,
783
                HistoryEventType.ParallelStateAborted,
784
                HistoryEventType.ParallelStateExited,
785
                HistoryEventType.ParallelStateFailed,
786
            }:
787
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
788
                in_sublist = False
1✔
789
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
790
                in_sublist = True
1✔
791
                sublist = []
1✔
792
                start_idx = i + 1
1✔
793
            elif in_sublist:
1✔
794
                event["id"] = (0,)
1✔
795
                event["previousEventId"] = 0
1✔
796
                sublist.append(event)
1✔
797

798
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
799
        pattern = parse("$..events")
1✔
800
        events = pattern.find(input_data)
1✔
801
        if not events:
1✔
802
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
803
            return input_data
×
804

805
        for events_data in events:
1✔
806
            self._normalise_events(events_data.value)
1✔
807

808
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc