• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 274ae585-9ad2-4b5f-8087-866ef08d3d6e

24 Apr 2025 05:15PM UTC coverage: 85.262% (-1.0%) from 86.266%
274ae585-9ad2-4b5f-8087-866ef08d3d6e

push

circleci

web-flow
CFn v2: support outputs (#12536)

10 of 29 new or added lines in 3 files covered. (34.48%)

1105 existing lines in 26 files now uncovered.

63256 of 74190 relevant lines covered (85.26%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.91
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from typing import Callable, Final, Optional
1✔
4

5
from botocore.exceptions import ClientError
1✔
6
from jsonpath_ng.ext import parse
1✔
7
from localstack_snapshot.snapshots.transformer import (
1✔
8
    JsonpathTransformer,
9
    RegexTransformer,
10
    TransformContext,
11
)
12

13
from localstack.aws.api.stepfunctions import (
1✔
14
    Arn,
15
    CloudWatchLogsLogGroup,
16
    CreateStateMachineOutput,
17
    Definition,
18
    ExecutionStatus,
19
    HistoryEventList,
20
    HistoryEventType,
21
    LogDestination,
22
    LoggingConfiguration,
23
    LogLevel,
24
    LongArn,
25
    StateMachineType,
26
)
27
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
28
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
29
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
30
from localstack.testing.aws.util import is_aws_cloud
1✔
31
from localstack.utils.strings import short_uid
1✔
32
from localstack.utils.sync import poll_condition
1✔
33

34
LOG = logging.getLogger(__name__)
1✔
35

36

37
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
38
# Running executions may emit logs after DeleteStateMachine API is called.
39
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
40
_SAMPLING_INTERVAL_SECONDS_AWS_CLOUD: Final[int] = 1
1✔
41
_SAMPLING_INTERVAL_SECONDS_LOCALSTACK: Final[float] = 0.2
1✔
42

43

44
def _get_sampling_interval_seconds() -> int | float:
1✔
45
    return (
1✔
46
        _SAMPLING_INTERVAL_SECONDS_AWS_CLOUD
47
        if is_aws_cloud()
48
        else _SAMPLING_INTERVAL_SECONDS_LOCALSTACK
49
    )
50

51

52
def await_no_state_machines_listed(stepfunctions_client):
1✔
53
    def _is_empty_state_machine_list():
×
UNCOV
54
        lst_resp = stepfunctions_client.list_state_machines()
×
UNCOV
55
        state_machines = lst_resp["stateMachines"]
×
UNCOV
56
        return not bool(state_machines)
×
57

UNCOV
58
    success = poll_condition(
×
59
        condition=_is_empty_state_machine_list,
60
        timeout=_DELETION_TIMEOUT_SECS,
61
        interval=_get_sampling_interval_seconds(),
62
    )
UNCOV
63
    if not success:
×
UNCOV
64
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
65

66

67
def _is_state_machine_alias_listed(
1✔
68
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
69
):
70
    list_state_machine_aliases_list = stepfunctions_client.list_state_machine_aliases(
1✔
71
        stateMachineArn=state_machine_arn
72
    )
73
    state_machine_aliases = list_state_machine_aliases_list["stateMachineAliases"]
1✔
74
    for state_machine_alias in state_machine_aliases:
1✔
75
        if state_machine_alias["stateMachineAliasArn"] == state_machine_alias_arn:
1✔
76
            return True
1✔
77
    return False
1✔
78

79

80
def await_state_machine_alias_is_created(
1✔
81
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
82
):
83
    success = poll_condition(
1✔
84
        condition=lambda: _is_state_machine_alias_listed(
85
            stepfunctions_client=stepfunctions_client,
86
            state_machine_arn=state_machine_arn,
87
            state_machine_alias_arn=state_machine_alias_arn,
88
        ),
89
        timeout=_DELETION_TIMEOUT_SECS,
90
        interval=_get_sampling_interval_seconds(),
91
    )
92
    if not success:
1✔
UNCOV
93
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
94

95

96
def await_state_machine_alias_is_deleted(
1✔
97
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
98
):
99
    success = poll_condition(
1✔
100
        condition=lambda: not _is_state_machine_alias_listed(
101
            stepfunctions_client=stepfunctions_client,
102
            state_machine_arn=state_machine_arn,
103
            state_machine_alias_arn=state_machine_alias_arn,
104
        ),
105
        timeout=_DELETION_TIMEOUT_SECS,
106
        interval=_get_sampling_interval_seconds(),
107
    )
108
    if not success:
1✔
UNCOV
109
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
110

111

112
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
113
    lst_resp = stepfunctions_client.list_state_machines()
1✔
114
    state_machines = lst_resp["stateMachines"]
1✔
115
    for state_machine in state_machines:
1✔
116
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
117
            return True
1✔
118
    return False
1✔
119

120

121
def _is_state_machine_version_listed(
1✔
122
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
123
) -> bool:
124
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
125
    versions = lst_resp["stateMachineVersions"]
1✔
126
    for version in versions:
1✔
127
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
128
            return True
1✔
129
    return False
1✔
130

131

132
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
133
    success = poll_condition(
1✔
134
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
135
        timeout=_DELETION_TIMEOUT_SECS,
136
        interval=_get_sampling_interval_seconds(),
137
    )
138
    if not success:
1✔
UNCOV
139
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
140

141

142
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
143
    success = poll_condition(
1✔
144
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
145
        timeout=_DELETION_TIMEOUT_SECS,
146
        interval=_get_sampling_interval_seconds(),
147
    )
148
    if not success:
1✔
UNCOV
149
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
150

151

152
def await_state_machine_version_not_listed(
1✔
153
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
154
):
155
    success = poll_condition(
1✔
156
        condition=lambda: not _is_state_machine_version_listed(
157
            stepfunctions_client, state_machine_arn, state_machine_version_arn
158
        ),
159
        timeout=_DELETION_TIMEOUT_SECS,
160
        interval=_get_sampling_interval_seconds(),
161
    )
162
    if not success:
1✔
UNCOV
163
        LOG.warning(
×
164
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
165
            state_machine_arn,
166
            state_machine_version_arn,
167
        )
168

169

170
def await_state_machine_version_listed(
1✔
171
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
172
):
173
    success = poll_condition(
1✔
174
        condition=lambda: _is_state_machine_version_listed(
175
            stepfunctions_client, state_machine_arn, state_machine_version_arn
176
        ),
177
        timeout=_DELETION_TIMEOUT_SECS,
178
        interval=_get_sampling_interval_seconds(),
179
    )
180
    if not success:
1✔
UNCOV
181
        LOG.warning(
×
182
            "Timed out whilst awaiting for version of %s to include '%s'.",
183
            state_machine_arn,
184
            state_machine_version_arn,
185
        )
186

187

188
def await_on_execution_events(
1✔
189
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
190
) -> HistoryEventList:
191
    events: HistoryEventList = list()
1✔
192

193
    def _run_check():
1✔
194
        nonlocal events
195
        events.clear()
1✔
196
        try:
1✔
197
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
UNCOV
198
        except ClientError:
×
UNCOV
199
            return False
×
200
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
201
        res: bool = check_func(events)
1✔
202
        return res
1✔
203

204
    assert poll_condition(
1✔
205
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
206
    )
207
    return events
1✔
208

209

210
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
211
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
212
        if len(events) > 0:
1✔
213
            last_event = events[-1]
1✔
214
            return "executionSucceededEventDetails" in last_event
1✔
215
        return False
1✔
216

217
    return await_on_execution_events(
1✔
218
        stepfunctions_client=stepfunctions_client,
219
        execution_arn=execution_arn,
220
        check_func=_check_last_is_success,
221
    )
222

223

224
def await_list_execution_status(
1✔
225
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
226
):
227
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
228

229
    def _run_check():
1✔
230
        list_resp = stepfunctions_client.list_executions(
1✔
231
            stateMachineArn=state_machine_arn, statusFilter=status
232
        )
233
        for execution in list_resp.get("executions", []):
1✔
234
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
UNCOV
235
                continue
×
236
            return True
1✔
237
        return False
1✔
238

239
    success = poll_condition(
1✔
240
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
241
    )
242
    if not success:
1✔
UNCOV
243
        LOG.warning(
×
244
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
245
            status,
246
            execution_arn,
247
        )
248

249

250
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
251
    if len(events) > 0:
1✔
252
        last_event = events[-1]
1✔
253
        last_event_type = last_event.get("type")
1✔
254
        return last_event_type is None or last_event_type in {
1✔
255
            HistoryEventType.ExecutionFailed,
256
            HistoryEventType.ExecutionAborted,
257
            HistoryEventType.ExecutionTimedOut,
258
            HistoryEventType.ExecutionSucceeded,
259
        }
260
    return False
1✔
261

262

263
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
264
    return await_on_execution_events(
1✔
265
        stepfunctions_client=stepfunctions_client,
266
        execution_arn=execution_arn,
267
        check_func=_is_last_history_event_terminal,
268
    )
269

270

271
def await_execution_lists_terminated(
1✔
272
    stepfunctions_client, state_machine_arn: str, execution_arn: str
273
):
274
    def _check_last_is_terminal() -> bool:
1✔
275
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
276
        executions = list_output["executions"]
1✔
277
        for execution in executions:
1✔
278
            if execution["executionArn"] == execution_arn:
1✔
279
                return execution["status"] != ExecutionStatus.RUNNING
1✔
UNCOV
280
        return False
×
281

282
    success = poll_condition(
1✔
283
        condition=_check_last_is_terminal, timeout=120, interval=_get_sampling_interval_seconds()
284
    )
285
    if not success:
1✔
UNCOV
286
        LOG.warning(
×
287
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
288
            execution_arn,
289
        )
290

291

292
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
293
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
294
        for event in events:
1✔
295
            return "executionStartedEventDetails" in event
1✔
296
        return False
×
297

298
    return await_on_execution_events(
1✔
299
        stepfunctions_client=stepfunctions_client,
300
        execution_arn=execution_arn,
301
        check_func=_check_stated_exists,
302
    )
303

304

305
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
306
    def _run_check():
1✔
307
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
308
        status: ExecutionStatus = desc_res["status"]
1✔
309
        return status == ExecutionStatus.ABORTED
1✔
310

311
    success = poll_condition(
1✔
312
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
313
    )
314
    if not success:
1✔
UNCOV
315
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
316

317

318
def get_expected_execution_logs(
1✔
319
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
320
) -> HistoryEventList:
321
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
322
    execution_history_events = execution_history["events"]
1✔
323
    expected_events = [
1✔
324
        event
325
        for event in execution_history_events
326
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
327
    ]
328
    return expected_events
1✔
329

330

331
def is_execution_logs_list_complete(
1✔
332
    expected_events: HistoryEventList,
333
) -> Callable[[HistoryEventList], bool]:
334
    def _validation_function(log_events: list) -> bool:
1✔
335
        if not expected_events:
1✔
UNCOV
336
            return True
×
337
        return len(expected_events) == len(log_events)
1✔
338

339
    return _validation_function
1✔
340

341

342
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
343
    logs_client = target_aws_client.logs
1✔
344
    log_stream_name = str()
1✔
345

346
    def _run_check():
1✔
347
        nonlocal log_stream_name
348
        try:
1✔
349
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
350
                "logStreams"
351
            ]
352
            if not log_streams:
1✔
UNCOV
353
                return False
×
354

355
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
356
            if (
1✔
357
                log_stream_name
358
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
359
            ):
360
                # SFN has not yet create the log stream for the execution, only the validation steam.
361
                return False
1✔
362
            return True
1✔
UNCOV
363
        except ClientError:
×
UNCOV
364
            return False
×
365

366
    assert poll_condition(condition=_run_check)
1✔
367
    return log_stream_name
1✔
368

369

370
def await_on_execution_logs(
1✔
371
    target_aws_client,
372
    log_group_name: str,
373
    validation_function: Callable[[HistoryEventList], bool] = None,
374
) -> HistoryEventList:
375
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
376

377
    logs_client = target_aws_client.logs
1✔
378
    events: HistoryEventList = list()
1✔
379

380
    def _run_check():
1✔
381
        nonlocal events
382
        events.clear()
1✔
383
        try:
1✔
384
            log_events = logs_client.get_log_events(
1✔
385
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
386
            )["events"]
387
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
UNCOV
388
        except ClientError:
×
UNCOV
389
            return False
×
390

391
        res = validation_function(events)
1✔
392
        return res
1✔
393

394
    assert poll_condition(condition=_run_check)
1✔
395
    return events
1✔
396

397

398
def create_state_machine_with_iam_role(
1✔
399
    target_aws_client,
400
    create_state_machine_iam_role,
401
    create_state_machine,
402
    snapshot,
403
    definition: Definition,
404
    logging_configuration: Optional[LoggingConfiguration] = None,
405
    state_machine_name: Optional[str] = None,
406
):
407
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
408
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
409
    snapshot.add_transformer(
1✔
410
        RegexTransformer(
411
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
412
            "Extended Request ID: <extended_request_id>",
413
        )
414
    )
415
    snapshot.add_transformer(
1✔
416
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
417
    )
418

419
    sm_name: str = state_machine_name or f"statemachine_create_and_record_execution_{short_uid()}"
1✔
420
    create_arguments = {
1✔
421
        "name": sm_name,
422
        "definition": definition,
423
        "roleArn": snf_role_arn,
424
    }
425
    if logging_configuration is not None:
1✔
426
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
427
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
428
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
429
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
430
    return state_machine_arn
1✔
431

432

433
def launch_and_record_execution(
1✔
434
    target_aws_client,
435
    sfn_snapshot,
436
    state_machine_arn,
437
    execution_input,
438
    verify_execution_description=False,
439
) -> LongArn:
440
    stepfunctions_client = target_aws_client.stepfunctions
1✔
441
    exec_resp = stepfunctions_client.start_execution(
1✔
442
        stateMachineArn=state_machine_arn, input=execution_input
443
    )
444
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
445
    execution_arn = exec_resp["executionArn"]
1✔
446

447
    await_execution_terminated(
1✔
448
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
449
    )
450

451
    if verify_execution_description:
1✔
452
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
453
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
454

455
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
456

457
    # Transform all map runs if any.
458
    try:
1✔
459
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
460
        if isinstance(map_run_arns, str):
1✔
461
            map_run_arns = [map_run_arns]
1✔
462
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
463
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
464
    except NoSuchJsonPathError:
1✔
465
        # No mapRunArns
466
        pass
1✔
467

468
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
469

470
    return execution_arn
1✔
471

472

473
def launch_and_record_mocked_execution(
1✔
474
    target_aws_client,
475
    sfn_snapshot,
476
    state_machine_arn,
477
    execution_input,
478
    test_name,
479
) -> LongArn:
480
    stepfunctions_client = target_aws_client.stepfunctions
1✔
481
    exec_resp = stepfunctions_client.start_execution(
1✔
482
        stateMachineArn=f"{state_machine_arn}#{test_name}", input=execution_input
483
    )
484
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
485
    execution_arn = exec_resp["executionArn"]
1✔
486

487
    await_execution_terminated(
1✔
488
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
489
    )
490

491
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
492

493
    # Transform all map runs if any.
494
    try:
1✔
495
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
UNCOV
496
        if isinstance(map_run_arns, str):
×
UNCOV
497
            map_run_arns = [map_run_arns]
×
UNCOV
498
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
×
UNCOV
499
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
×
500
    except NoSuchJsonPathError:
1✔
501
        # No mapRunArns
502
        pass
1✔
503

504
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
505

506
    return execution_arn
1✔
507

508

509
def launch_and_record_logs(
1✔
510
    target_aws_client,
511
    state_machine_arn,
512
    execution_input,
513
    log_level,
514
    log_group_name,
515
    sfn_snapshot,
516
):
517
    execution_arn = launch_and_record_execution(
1✔
518
        target_aws_client,
519
        sfn_snapshot,
520
        state_machine_arn,
521
        execution_input,
522
    )
523
    expected_events = get_expected_execution_logs(
1✔
524
        target_aws_client.stepfunctions, log_level, execution_arn
525
    )
526

527
    if log_level == LogLevel.OFF or not expected_events:
1✔
528
        # The test should terminate here, as no log streams for this execution would have been created.
529
        return
1✔
530

531
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
532
    logged_execution_events = await_on_execution_logs(
1✔
533
        target_aws_client, log_group_name, logs_validation_function
534
    )
535

536
    sfn_snapshot.add_transformer(
1✔
537
        JsonpathTransformer(
538
            jsonpath="$..event_timestamp",
539
            replacement="timestamp",
540
            replace_reference=False,
541
        )
542
    )
543
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
544

545

546
# TODO: make this return the execution ARN for manual assertions
547
def create_and_record_execution(
1✔
548
    target_aws_client,
549
    create_state_machine_iam_role,
550
    create_state_machine,
551
    sfn_snapshot,
552
    definition,
553
    execution_input,
554
    verify_execution_description=False,
555
):
556
    state_machine_arn = create_state_machine_with_iam_role(
1✔
557
        target_aws_client,
558
        create_state_machine_iam_role,
559
        create_state_machine,
560
        sfn_snapshot,
561
        definition,
562
    )
563
    launch_and_record_execution(
1✔
564
        target_aws_client,
565
        sfn_snapshot,
566
        state_machine_arn,
567
        execution_input,
568
        verify_execution_description,
569
    )
570

571

572
def create_and_record_mocked_execution(
1✔
573
    target_aws_client,
574
    create_state_machine_iam_role,
575
    create_state_machine,
576
    sfn_snapshot,
577
    definition,
578
    execution_input,
579
    state_machine_name,
580
    test_name,
581
):
582
    state_machine_arn = create_state_machine_with_iam_role(
1✔
583
        target_aws_client,
584
        create_state_machine_iam_role,
585
        create_state_machine,
586
        sfn_snapshot,
587
        definition,
588
        state_machine_name=state_machine_name,
589
    )
590
    launch_and_record_mocked_execution(
1✔
591
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
592
    )
593

594

595
def create_and_record_logs(
1✔
596
    target_aws_client,
597
    create_state_machine_iam_role,
598
    create_state_machine,
599
    sfn_create_log_group,
600
    sfn_snapshot,
601
    definition,
602
    execution_input,
603
    log_level: LogLevel,
604
    include_execution_data: bool,
605
):
606
    state_machine_arn = create_state_machine_with_iam_role(
1✔
607
        target_aws_client,
608
        create_state_machine_iam_role,
609
        create_state_machine,
610
        sfn_snapshot,
611
        definition,
612
    )
613

614
    log_group_name = sfn_create_log_group()
1✔
615
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
616
        "logGroups"
617
    ][0]["arn"]
618
    logging_configuration = LoggingConfiguration(
1✔
619
        level=log_level,
620
        includeExecutionData=include_execution_data,
621
        destinations=[
622
            LogDestination(
623
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
624
            ),
625
        ],
626
    )
627
    target_aws_client.stepfunctions.update_state_machine(
1✔
628
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
629
    )
630

631
    launch_and_record_logs(
1✔
632
        target_aws_client,
633
        state_machine_arn,
634
        execution_input,
635
        log_level,
636
        log_group_name,
637
        sfn_snapshot,
638
    )
639

640

641
def launch_and_record_sync_execution(
1✔
642
    target_aws_client,
643
    sfn_snapshot,
644
    state_machine_arn,
645
    execution_input,
646
):
647
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
648
        stateMachineArn=state_machine_arn,
649
        input=execution_input,
650
    )
651
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
652
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
653

654

655
def create_and_record_express_sync_execution(
1✔
656
    target_aws_client,
657
    create_state_machine_iam_role,
658
    create_state_machine,
659
    sfn_snapshot,
660
    definition,
661
    execution_input,
662
):
663
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
664
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
665

666
    creation_response = create_state_machine(
1✔
667
        target_aws_client,
668
        name=f"express_statemachine_{short_uid()}",
669
        definition=definition,
670
        roleArn=snf_role_arn,
671
        type=StateMachineType.EXPRESS,
672
    )
673
    state_machine_arn = creation_response["stateMachineArn"]
1✔
674
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
675
    sfn_snapshot.match("creation_response", creation_response)
1✔
676

677
    launch_and_record_sync_execution(
1✔
678
        target_aws_client,
679
        sfn_snapshot,
680
        state_machine_arn,
681
        execution_input,
682
    )
683

684

685
def launch_and_record_express_async_execution(
1✔
686
    target_aws_client,
687
    sfn_snapshot,
688
    state_machine_arn,
689
    log_group_name,
690
    execution_input,
691
):
692
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
693
        stateMachineArn=state_machine_arn, input=execution_input
694
    )
695
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
696
    execution_arn = start_execution["executionArn"]
1✔
697

698
    event_list = await_on_execution_logs(
1✔
699
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
700
    )
701
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
702
    end_event = event_list[-1]
1✔
703
    sfn_snapshot.match("end_event", end_event)
1✔
704

705
    return execution_arn
1✔
706

707

708
def create_and_record_express_async_execution(
1✔
709
    target_aws_client,
710
    create_state_machine_iam_role,
711
    create_state_machine,
712
    sfn_create_log_group,
713
    sfn_snapshot,
714
    definition,
715
    execution_input,
716
    include_execution_data: bool = True,
717
) -> tuple[LongArn, LongArn]:
718
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
719
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
720

721
    log_group_name = sfn_create_log_group()
1✔
722
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
723
        "logGroups"
724
    ][0]["arn"]
725
    logging_configuration = LoggingConfiguration(
1✔
726
        level=LogLevel.ALL,
727
        includeExecutionData=include_execution_data,
728
        destinations=[
729
            LogDestination(
730
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
731
            ),
732
        ],
733
    )
734

735
    creation_response = create_state_machine(
1✔
736
        target_aws_client,
737
        name=f"express_statemachine_{short_uid()}",
738
        definition=definition,
739
        roleArn=snf_role_arn,
740
        type=StateMachineType.EXPRESS,
741
        loggingConfiguration=logging_configuration,
742
    )
743
    state_machine_arn = creation_response["stateMachineArn"]
1✔
744
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
745
    sfn_snapshot.match("creation_response", creation_response)
1✔
746

747
    execution_arn = launch_and_record_express_async_execution(
1✔
748
        target_aws_client,
749
        sfn_snapshot,
750
        state_machine_arn,
751
        log_group_name,
752
        execution_input,
753
    )
754
    return state_machine_arn, execution_arn
1✔
755

756

757
def create_and_record_events(
1✔
758
    create_state_machine_iam_role,
759
    create_state_machine,
760
    sfn_events_to_sqs_queue,
761
    target_aws_client,
762
    sfn_snapshot,
763
    definition,
764
    execution_input,
765
):
766
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
767
    sfn_snapshot.add_transformers_list(
1✔
768
        [
769
            JsonpathTransformer(
770
                jsonpath="$..detail.startDate",
771
                replacement="start-date",
772
                replace_reference=False,
773
            ),
774
            JsonpathTransformer(
775
                jsonpath="$..detail.stopDate",
776
                replacement="stop-date",
777
                replace_reference=False,
778
            ),
779
            JsonpathTransformer(
780
                jsonpath="$..detail.name",
781
                replacement="test_event_bridge_events-{short_uid()}",
782
                replace_reference=False,
783
            ),
784
        ]
785
    )
786

787
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
788
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
789
        target_aws_client,
790
        name=f"test_event_bridge_events-{short_uid()}",
791
        definition=definition,
792
        roleArn=snf_role_arn,
793
    )
794
    state_machine_arn = create_output["stateMachineArn"]
1✔
795

796
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
797

798
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
799
        stateMachineArn=state_machine_arn, input=execution_input
800
    )
801
    execution_arn = start_execution["executionArn"]
1✔
802
    await_execution_terminated(
1✔
803
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
804
    )
805

806
    stepfunctions_events = list()
1✔
807

808
    def _get_events():
1✔
809
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
810
        for message in received.get("Messages", []):
1✔
811
            body = json.loads(message["Body"])
1✔
812
            stepfunctions_events.append(body)
1✔
813
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
814
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
815

816
    poll_condition(_get_events, timeout=60)
1✔
817

818
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
819

820

821
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
822
    stepfunctions_events = list()
1✔
823

824
    def _get_events():
1✔
825
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
826
        for message in received.get("Messages", []):
1✔
827
            body = json.loads(message["Body"])
1✔
828
            stepfunctions_events.append(body)
1✔
829
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
830
        return len(stepfunctions_events) == num_events
1✔
831

832
    poll_condition(_get_events, timeout=60)
1✔
833
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", dict())))
1✔
834
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
835

836

837
class SfnNoneRecursiveParallelTransformer:
1✔
838
    """
839
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
840
    """
841

842
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
843
        self.events_jsonpath: str = events_jsonpath
1✔
844

845
    @staticmethod
1✔
846
    def _normalise_events(events: list[dict]) -> None:
1✔
847
        start_idx = None
1✔
848
        sublist = list()
1✔
849
        in_sublist = False
1✔
850
        for i, event in enumerate(events):
1✔
851
            event_type = event.get("type")
1✔
852
            if event_type is None:
1✔
UNCOV
853
                LOG.debug(
×
854
                    "No 'type' in event item '%s'.",
855
                    event,
856
                )
UNCOV
857
                in_sublist = False
×
858

859
            elif event_type in {
1✔
860
                None,
861
                HistoryEventType.ParallelStateSucceeded,
862
                HistoryEventType.ParallelStateAborted,
863
                HistoryEventType.ParallelStateExited,
864
                HistoryEventType.ParallelStateFailed,
865
            }:
866
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
867
                in_sublist = False
1✔
868
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
869
                in_sublist = True
1✔
870
                sublist = []
1✔
871
                start_idx = i + 1
1✔
872
            elif in_sublist:
1✔
873
                event["id"] = (0,)
1✔
874
                event["previousEventId"] = 0
1✔
875
                sublist.append(event)
1✔
876

877
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
878
        pattern = parse("$..events")
1✔
879
        events = pattern.find(input_data)
1✔
880
        if not events:
1✔
UNCOV
881
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
UNCOV
882
            return input_data
×
883

884
        for events_data in events:
1✔
885
            self._normalise_events(events_data.value)
1✔
886

887
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc