• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22166837280

19 Feb 2026 02:47AM UTC coverage: 87.003% (-0.004%) from 87.007%
22166837280

push

github

web-flow
SNS: Move Tagging Functionality to Provider Methods (#13608)

31 of 31 new or added lines in 1 file covered. (100.0%)

122 existing lines in 11 files now uncovered.

69763 of 80185 relevant lines covered (87.0%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.93
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from collections.abc import Callable
1✔
4
from typing import Final
1✔
5

6
from botocore.exceptions import ClientError
1✔
7
from jsonpath_ng.ext import parse
1✔
8
from localstack_snapshot.snapshots.transformer import (
1✔
9
    JsonpathTransformer,
10
    RegexTransformer,
11
    TransformContext,
12
)
13

14
from localstack import config
1✔
15
from localstack.aws.api.stepfunctions import (
1✔
16
    Arn,
17
    CloudWatchLogsLogGroup,
18
    CreateStateMachineOutput,
19
    Definition,
20
    ExecutionStatus,
21
    HistoryEventList,
22
    HistoryEventType,
23
    LogDestination,
24
    LoggingConfiguration,
25
    LogLevel,
26
    LongArn,
27
    StateMachineType,
28
)
29
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
30
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
31
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
32
from localstack.testing.aws.util import is_aws_cloud
1✔
33
from localstack.utils.strings import short_uid
1✔
34
from localstack.utils.sync import poll_condition
1✔
35

36
LOG = logging.getLogger(__name__)
1✔
37

38

39
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
40
# Running executions may emit logs after DeleteStateMachine API is called.
41
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
42
_SAMPLING_INTERVAL_SECONDS_AWS_CLOUD: Final[int] = 1
1✔
43
_SAMPLING_INTERVAL_SECONDS_LOCALSTACK: Final[float] = 0.2
1✔
44

45

46
def _get_sampling_interval_seconds() -> int | float:
1✔
47
    return (
1✔
48
        _SAMPLING_INTERVAL_SECONDS_AWS_CLOUD
49
        if is_aws_cloud()
50
        else _SAMPLING_INTERVAL_SECONDS_LOCALSTACK
51
    )
52

53

54
def await_no_state_machines_listed(stepfunctions_client):
1✔
55
    def _is_empty_state_machine_list():
×
56
        lst_resp = stepfunctions_client.list_state_machines()
×
57
        state_machines = lst_resp["stateMachines"]
×
58
        return not bool(state_machines)
×
59

60
    success = poll_condition(
×
61
        condition=_is_empty_state_machine_list,
62
        timeout=_DELETION_TIMEOUT_SECS,
63
        interval=_get_sampling_interval_seconds(),
64
    )
65
    if not success:
×
66
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
67

68

69
def _is_state_machine_alias_listed(
1✔
70
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
71
):
72
    list_state_machine_aliases_list = stepfunctions_client.list_state_machine_aliases(
1✔
73
        stateMachineArn=state_machine_arn
74
    )
75
    state_machine_aliases = list_state_machine_aliases_list["stateMachineAliases"]
1✔
76
    for state_machine_alias in state_machine_aliases:
1✔
77
        if state_machine_alias["stateMachineAliasArn"] == state_machine_alias_arn:
1✔
78
            return True
1✔
79
    return False
1✔
80

81

82
def await_state_machine_alias_is_created(
1✔
83
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
84
):
85
    success = poll_condition(
1✔
86
        condition=lambda: _is_state_machine_alias_listed(
87
            stepfunctions_client=stepfunctions_client,
88
            state_machine_arn=state_machine_arn,
89
            state_machine_alias_arn=state_machine_alias_arn,
90
        ),
91
        timeout=_DELETION_TIMEOUT_SECS,
92
        interval=_get_sampling_interval_seconds(),
93
    )
94
    if not success:
1✔
95
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
96

97

98
def await_state_machine_alias_is_deleted(
1✔
99
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
100
):
101
    success = poll_condition(
1✔
102
        condition=lambda: (
103
            not _is_state_machine_alias_listed(
104
                stepfunctions_client=stepfunctions_client,
105
                state_machine_arn=state_machine_arn,
106
                state_machine_alias_arn=state_machine_alias_arn,
107
            )
108
        ),
109
        timeout=_DELETION_TIMEOUT_SECS,
110
        interval=_get_sampling_interval_seconds(),
111
    )
112
    if not success:
1✔
113
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
114

115

116
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
117
    lst_resp = stepfunctions_client.list_state_machines()
1✔
118
    state_machines = lst_resp["stateMachines"]
1✔
119
    for state_machine in state_machines:
1✔
120
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
121
            return True
1✔
122
    return False
1✔
123

124

125
def _is_state_machine_version_listed(
1✔
126
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
127
) -> bool:
128
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
129
    versions = lst_resp["stateMachineVersions"]
1✔
130
    for version in versions:
1✔
131
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
132
            return True
1✔
133
    return False
1✔
134

135

136
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
137
    success = poll_condition(
1✔
138
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
139
        timeout=_DELETION_TIMEOUT_SECS,
140
        interval=_get_sampling_interval_seconds(),
141
    )
142
    if not success:
1✔
143
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
144

145

146
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
147
    success = poll_condition(
1✔
148
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
149
        timeout=_DELETION_TIMEOUT_SECS,
150
        interval=_get_sampling_interval_seconds(),
151
    )
152
    if not success:
1✔
153
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
154

155

156
def await_state_machine_version_not_listed(
1✔
157
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
158
):
159
    success = poll_condition(
1✔
160
        condition=lambda: (
161
            not _is_state_machine_version_listed(
162
                stepfunctions_client, state_machine_arn, state_machine_version_arn
163
            )
164
        ),
165
        timeout=_DELETION_TIMEOUT_SECS,
166
        interval=_get_sampling_interval_seconds(),
167
    )
168
    if not success:
1✔
169
        LOG.warning(
×
170
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
171
            state_machine_arn,
172
            state_machine_version_arn,
173
        )
174

175

176
def await_state_machine_version_listed(
1✔
177
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
178
):
179
    success = poll_condition(
1✔
180
        condition=lambda: _is_state_machine_version_listed(
181
            stepfunctions_client, state_machine_arn, state_machine_version_arn
182
        ),
183
        timeout=_DELETION_TIMEOUT_SECS,
184
        interval=_get_sampling_interval_seconds(),
185
    )
186
    if not success:
1✔
187
        LOG.warning(
×
188
            "Timed out whilst awaiting for version of %s to include '%s'.",
189
            state_machine_arn,
190
            state_machine_version_arn,
191
        )
192

193

194
def await_on_execution_events(
1✔
195
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
196
) -> HistoryEventList:
197
    events: HistoryEventList = []
1✔
198

199
    def _run_check():
1✔
200
        nonlocal events
201
        events.clear()
1✔
202
        try:
1✔
203
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
204
        except ClientError:
×
205
            return False
×
206
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
207
        res: bool = check_func(events)
1✔
208
        return res
1✔
209

210
    assert poll_condition(
1✔
211
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
212
    )
213
    return events
1✔
214

215

216
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
217
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
218
        if len(events) > 0:
1✔
219
            last_event = events[-1]
1✔
220
            return "executionSucceededEventDetails" in last_event
1✔
221
        return False
×
222

223
    return await_on_execution_events(
1✔
224
        stepfunctions_client=stepfunctions_client,
225
        execution_arn=execution_arn,
226
        check_func=_check_last_is_success,
227
    )
228

229

230
def await_list_execution_status(
1✔
231
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
232
):
233
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
234

235
    def _run_check():
1✔
236
        list_resp = stepfunctions_client.list_executions(
1✔
237
            stateMachineArn=state_machine_arn, statusFilter=status
238
        )
239
        for execution in list_resp.get("executions", []):
1✔
240
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
241
                continue
×
242
            return True
1✔
UNCOV
243
        return False
×
244

245
    success = poll_condition(
1✔
246
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
247
    )
248
    if not success:
1✔
249
        LOG.warning(
×
250
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
251
            status,
252
            execution_arn,
253
        )
254

255

256
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
257
    if len(events) > 0:
1✔
258
        last_event = events[-1]
1✔
259
        last_event_type = last_event.get("type")
1✔
260
        return last_event_type is None or last_event_type in {
1✔
261
            HistoryEventType.ExecutionFailed,
262
            HistoryEventType.ExecutionAborted,
263
            HistoryEventType.ExecutionTimedOut,
264
            HistoryEventType.ExecutionSucceeded,
265
        }
266
    return False
1✔
267

268

269
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
270
    return await_on_execution_events(
1✔
271
        stepfunctions_client=stepfunctions_client,
272
        execution_arn=execution_arn,
273
        check_func=_is_last_history_event_terminal,
274
    )
275

276

277
def await_execution_lists_terminated(
1✔
278
    stepfunctions_client, state_machine_arn: str, execution_arn: str
279
):
280
    def _check_last_is_terminal() -> bool:
1✔
281
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
282
        executions = list_output["executions"]
1✔
283
        for execution in executions:
1✔
284
            if execution["executionArn"] == execution_arn:
1✔
285
                return execution["status"] != ExecutionStatus.RUNNING
1✔
286
        return False
×
287

288
    success = poll_condition(
1✔
289
        condition=_check_last_is_terminal, timeout=120, interval=_get_sampling_interval_seconds()
290
    )
291
    if not success:
1✔
292
        LOG.warning(
×
293
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
294
            execution_arn,
295
        )
296

297

298
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
299
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
300
        for event in events:
1✔
301
            return "executionStartedEventDetails" in event
1✔
302
        return False
×
303

304
    return await_on_execution_events(
1✔
305
        stepfunctions_client=stepfunctions_client,
306
        execution_arn=execution_arn,
307
        check_func=_check_stated_exists,
308
    )
309

310

311
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
312
    def _run_check():
1✔
313
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
314
        status: ExecutionStatus = desc_res["status"]
1✔
315
        return status == ExecutionStatus.ABORTED
1✔
316

317
    success = poll_condition(
1✔
318
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
319
    )
320
    if not success:
1✔
321
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
322

323

324
def get_expected_execution_logs(
1✔
325
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
326
) -> HistoryEventList:
327
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
328
    execution_history_events = execution_history["events"]
1✔
329
    expected_events = [
1✔
330
        event
331
        for event in execution_history_events
332
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
333
    ]
334
    return expected_events
1✔
335

336

337
def is_execution_logs_list_complete(
1✔
338
    expected_events: HistoryEventList,
339
) -> Callable[[HistoryEventList], bool]:
340
    def _validation_function(log_events: list) -> bool:
1✔
341
        if not expected_events:
1✔
342
            return True
×
343
        return len(expected_events) == len(log_events)
1✔
344

345
    return _validation_function
1✔
346

347

348
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
349
    logs_client = target_aws_client.logs
1✔
350
    log_stream_name = ""
1✔
351

352
    def _run_check():
1✔
353
        nonlocal log_stream_name
354
        try:
1✔
355
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
356
                "logStreams"
357
            ]
358
            if not log_streams:
1✔
359
                return False
×
360

361
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
362
            if (
1✔
363
                log_stream_name
364
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
365
            ):
366
                # SFN has not yet create the log stream for the execution, only the validation steam.
367
                return False
1✔
368
            return True
1✔
369
        except ClientError:
×
370
            return False
×
371

372
    assert poll_condition(condition=_run_check)
1✔
373
    return log_stream_name
1✔
374

375

376
def await_on_execution_logs(
1✔
377
    target_aws_client,
378
    log_group_name: str,
379
    validation_function: Callable[[HistoryEventList], bool] = None,
380
) -> HistoryEventList:
381
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
382

383
    logs_client = target_aws_client.logs
1✔
384
    events: HistoryEventList = []
1✔
385

386
    def _run_check():
1✔
387
        nonlocal events
388
        events.clear()
1✔
389
        try:
1✔
390
            log_events = logs_client.get_log_events(
1✔
391
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
392
            )["events"]
393
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
394
        except ClientError:
×
395
            return False
×
396

397
        res = validation_function(events)
1✔
398
        return res
1✔
399

400
    assert poll_condition(condition=_run_check)
1✔
401
    return events
1✔
402

403

404
def create_state_machine_with_iam_role(
1✔
405
    target_aws_client,
406
    create_state_machine_iam_role,
407
    create_state_machine,
408
    snapshot,
409
    definition: Definition,
410
    logging_configuration: LoggingConfiguration | None = None,
411
    state_machine_name: str | None = None,
412
    state_machine_type: StateMachineType = StateMachineType.STANDARD,
413
):
414
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
415
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
416
    snapshot.add_transformer(
1✔
417
        RegexTransformer(
418
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
419
            "Extended Request ID: <extended_request_id>",
420
        )
421
    )
422
    snapshot.add_transformer(
1✔
423
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
424
    )
425

426
    sm_name: str = state_machine_name or f"statemachine_create_and_record_execution_{short_uid()}"
1✔
427
    create_arguments = {
1✔
428
        "name": sm_name,
429
        "definition": definition,
430
        "roleArn": snf_role_arn,
431
        "type": state_machine_type,
432
    }
433
    if logging_configuration is not None:
1✔
434
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
435
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
436
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
437
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
438
    return state_machine_arn
1✔
439

440

441
def launch_and_record_execution(
1✔
442
    target_aws_client,
443
    sfn_snapshot,
444
    state_machine_arn,
445
    execution_input,
446
    verify_execution_description=False,
447
) -> LongArn:
448
    stepfunctions_client = target_aws_client.stepfunctions
1✔
449
    exec_resp = stepfunctions_client.start_execution(
1✔
450
        stateMachineArn=state_machine_arn, input=execution_input
451
    )
452
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
453
    execution_arn = exec_resp["executionArn"]
1✔
454

455
    await_execution_terminated(
1✔
456
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
457
    )
458

459
    if verify_execution_description:
1✔
460
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
461
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
462

463
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
464

465
    # Transform all map runs if any.
466
    try:
1✔
467
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
468
        if isinstance(map_run_arns, str):
1✔
469
            map_run_arns = [map_run_arns]
1✔
470
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
471
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
472
    except NoSuchJsonPathError:
1✔
473
        # No mapRunArns
474
        pass
1✔
475

476
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
477

478
    return execution_arn
1✔
479

480

481
def launch_and_record_mocked_execution(
1✔
482
    target_aws_client,
483
    sfn_snapshot,
484
    state_machine_arn,
485
    execution_input,
486
    test_name,
487
) -> LongArn:
488
    stepfunctions_client = target_aws_client.stepfunctions
1✔
489
    exec_resp = stepfunctions_client.start_execution(
1✔
490
        stateMachineArn=f"{state_machine_arn}#{test_name}", input=execution_input
491
    )
492
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
493
    execution_arn = exec_resp["executionArn"]
1✔
494

495
    await_execution_terminated(
1✔
496
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
497
    )
498

499
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
500

501
    # Transform all map runs if any.
502
    try:
1✔
503
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
504
        if isinstance(map_run_arns, str):
1✔
505
            map_run_arns = [map_run_arns]
×
506
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
507
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
508
    except NoSuchJsonPathError:
1✔
509
        # No mapRunArns
510
        pass
1✔
511

512
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
513

514
    return execution_arn
1✔
515

516

517
def launch_and_record_mocked_sync_execution(
1✔
518
    target_aws_client,
519
    sfn_snapshot,
520
    state_machine_arn,
521
    execution_input,
522
    test_name,
523
) -> LongArn:
524
    stepfunctions_client = target_aws_client.stepfunctions
1✔
525

526
    exec_resp = stepfunctions_client.start_sync_execution(
1✔
527
        stateMachineArn=f"{state_machine_arn}#{test_name}",
528
        input=execution_input,
529
    )
530

531
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
532

533
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
534

535
    return exec_resp["executionArn"]
1✔
536

537

538
def launch_and_record_logs(
1✔
539
    target_aws_client,
540
    state_machine_arn,
541
    execution_input,
542
    log_level,
543
    log_group_name,
544
    sfn_snapshot,
545
):
546
    execution_arn = launch_and_record_execution(
1✔
547
        target_aws_client,
548
        sfn_snapshot,
549
        state_machine_arn,
550
        execution_input,
551
    )
552
    expected_events = get_expected_execution_logs(
1✔
553
        target_aws_client.stepfunctions, log_level, execution_arn
554
    )
555

556
    if log_level == LogLevel.OFF or not expected_events:
1✔
557
        # The test should terminate here, as no log streams for this execution would have been created.
558
        return
1✔
559

560
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
561
    logged_execution_events = await_on_execution_logs(
1✔
562
        target_aws_client, log_group_name, logs_validation_function
563
    )
564

565
    sfn_snapshot.add_transformer(
1✔
566
        JsonpathTransformer(
567
            jsonpath="$..event_timestamp",
568
            replacement="timestamp",
569
            replace_reference=False,
570
        )
571
    )
572
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
573

574

575
# TODO refactor to have fewer positional arguments. Consider converting to a fixture.
576
def create_and_record_execution(
1✔
577
    target_aws_client,
578
    create_state_machine_iam_role,
579
    create_state_machine,
580
    sfn_snapshot,
581
    definition,
582
    execution_input,
583
    verify_execution_description=False,
584
) -> LongArn:
585
    state_machine_arn = create_state_machine_with_iam_role(
1✔
586
        target_aws_client,
587
        create_state_machine_iam_role,
588
        create_state_machine,
589
        sfn_snapshot,
590
        definition,
591
    )
592
    exeuction_arn = launch_and_record_execution(
1✔
593
        target_aws_client,
594
        sfn_snapshot,
595
        state_machine_arn,
596
        execution_input,
597
        verify_execution_description,
598
    )
599
    return exeuction_arn
1✔
600

601

602
def create_and_record_mocked_execution(
1✔
603
    target_aws_client,
604
    create_state_machine_iam_role,
605
    create_state_machine,
606
    sfn_snapshot,
607
    definition,
608
    execution_input,
609
    state_machine_name,
610
    test_name,
611
    state_machine_type: StateMachineType = StateMachineType.STANDARD,
612
) -> LongArn:
613
    state_machine_arn = create_state_machine_with_iam_role(
1✔
614
        target_aws_client,
615
        create_state_machine_iam_role,
616
        create_state_machine,
617
        sfn_snapshot,
618
        definition,
619
        state_machine_name=state_machine_name,
620
        state_machine_type=state_machine_type,
621
    )
622
    execution_arn = launch_and_record_mocked_execution(
1✔
623
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
624
    )
625
    return execution_arn
1✔
626

627

628
def create_and_record_mocked_sync_execution(
1✔
629
    target_aws_client,
630
    create_state_machine_iam_role,
631
    create_state_machine,
632
    sfn_snapshot,
633
    definition,
634
    execution_input,
635
    state_machine_name,
636
    test_name,
637
) -> LongArn:
638
    state_machine_arn = create_state_machine_with_iam_role(
1✔
639
        target_aws_client,
640
        create_state_machine_iam_role,
641
        create_state_machine,
642
        sfn_snapshot,
643
        definition,
644
        state_machine_name=state_machine_name,
645
        state_machine_type=StateMachineType.EXPRESS,
646
    )
647
    execution_arn = launch_and_record_mocked_sync_execution(
1✔
648
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
649
    )
650
    return execution_arn
1✔
651

652

653
def create_and_run_mock(
1✔
654
    target_aws_client,
655
    monkeypatch,
656
    mock_config_file,
657
    mock_config: dict,
658
    state_machine_name: str,
659
    definition_template: dict,
660
    execution_input: str,
661
    test_name: str,
662
):
663
    mock_config_file_path = mock_config_file(mock_config)
1✔
664
    monkeypatch.setattr(config, "SFN_MOCK_CONFIG", mock_config_file_path)
1✔
665

666
    sfn_client = target_aws_client.stepfunctions
1✔
667

668
    state_machine_name: str = state_machine_name or f"mocked_statemachine_{short_uid()}"
1✔
669
    definition = json.dumps(definition_template)
1✔
670
    creation_response = sfn_client.create_state_machine(
1✔
671
        name=state_machine_name,
672
        definition=definition,
673
        roleArn="arn:aws:iam::111111111111:role/mock-role/mocked-run",
674
    )
675
    state_machine_arn = creation_response["stateMachineArn"]
1✔
676

677
    test_case_arn = f"{state_machine_arn}#{test_name}"
1✔
678
    execution = sfn_client.start_execution(stateMachineArn=test_case_arn, input=execution_input)
1✔
679
    execution_arn = execution["executionArn"]
1✔
680

681
    await_execution_terminated(stepfunctions_client=sfn_client, execution_arn=execution_arn)
1✔
682
    sfn_client.delete_state_machine(stateMachineArn=state_machine_arn)
1✔
683

684
    return execution_arn
1✔
685

686

687
def create_and_record_logs(
1✔
688
    target_aws_client,
689
    create_state_machine_iam_role,
690
    create_state_machine,
691
    sfn_create_log_group,
692
    sfn_snapshot,
693
    definition,
694
    execution_input,
695
    log_level: LogLevel,
696
    include_execution_data: bool,
697
):
698
    state_machine_arn = create_state_machine_with_iam_role(
1✔
699
        target_aws_client,
700
        create_state_machine_iam_role,
701
        create_state_machine,
702
        sfn_snapshot,
703
        definition,
704
    )
705

706
    log_group_name = sfn_create_log_group()
1✔
707
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
708
        "logGroups"
709
    ][0]["arn"]
710
    logging_configuration = LoggingConfiguration(
1✔
711
        level=log_level,
712
        includeExecutionData=include_execution_data,
713
        destinations=[
714
            LogDestination(
715
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
716
            ),
717
        ],
718
    )
719
    target_aws_client.stepfunctions.update_state_machine(
1✔
720
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
721
    )
722

723
    launch_and_record_logs(
1✔
724
        target_aws_client,
725
        state_machine_arn,
726
        execution_input,
727
        log_level,
728
        log_group_name,
729
        sfn_snapshot,
730
    )
731

732

733
def launch_and_record_sync_execution(
1✔
734
    target_aws_client,
735
    sfn_snapshot,
736
    state_machine_arn,
737
    execution_input,
738
):
739
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
740
        stateMachineArn=state_machine_arn,
741
        input=execution_input,
742
    )
743
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
744
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
745

746

747
def create_and_record_express_sync_execution(
1✔
748
    target_aws_client,
749
    create_state_machine_iam_role,
750
    create_state_machine,
751
    sfn_snapshot,
752
    definition,
753
    execution_input,
754
):
755
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
756
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
757

758
    creation_response = create_state_machine(
1✔
759
        target_aws_client,
760
        name=f"express_statemachine_{short_uid()}",
761
        definition=definition,
762
        roleArn=snf_role_arn,
763
        type=StateMachineType.EXPRESS,
764
    )
765
    state_machine_arn = creation_response["stateMachineArn"]
1✔
766
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
767
    sfn_snapshot.match("creation_response", creation_response)
1✔
768

769
    launch_and_record_sync_execution(
1✔
770
        target_aws_client,
771
        sfn_snapshot,
772
        state_machine_arn,
773
        execution_input,
774
    )
775

776

777
def launch_and_record_express_async_execution(
1✔
778
    target_aws_client,
779
    sfn_snapshot,
780
    state_machine_arn,
781
    log_group_name,
782
    execution_input,
783
):
784
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
785
        stateMachineArn=state_machine_arn, input=execution_input
786
    )
787
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
788
    execution_arn = start_execution["executionArn"]
1✔
789

790
    event_list = await_on_execution_logs(
1✔
791
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
792
    )
793
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
794
    end_event = event_list[-1]
1✔
795
    sfn_snapshot.match("end_event", end_event)
1✔
796

797
    return execution_arn
1✔
798

799

800
def create_and_record_express_async_execution(
1✔
801
    target_aws_client,
802
    create_state_machine_iam_role,
803
    create_state_machine,
804
    sfn_create_log_group,
805
    sfn_snapshot,
806
    definition,
807
    execution_input,
808
    include_execution_data: bool = True,
809
) -> tuple[LongArn, LongArn]:
810
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
811
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
812

813
    log_group_name = sfn_create_log_group()
1✔
814
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
815
        "logGroups"
816
    ][0]["arn"]
817
    logging_configuration = LoggingConfiguration(
1✔
818
        level=LogLevel.ALL,
819
        includeExecutionData=include_execution_data,
820
        destinations=[
821
            LogDestination(
822
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
823
            ),
824
        ],
825
    )
826

827
    creation_response = create_state_machine(
1✔
828
        target_aws_client,
829
        name=f"express_statemachine_{short_uid()}",
830
        definition=definition,
831
        roleArn=snf_role_arn,
832
        type=StateMachineType.EXPRESS,
833
        loggingConfiguration=logging_configuration,
834
    )
835
    state_machine_arn = creation_response["stateMachineArn"]
1✔
836
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
837
    sfn_snapshot.match("creation_response", creation_response)
1✔
838

839
    execution_arn = launch_and_record_express_async_execution(
1✔
840
        target_aws_client,
841
        sfn_snapshot,
842
        state_machine_arn,
843
        log_group_name,
844
        execution_input,
845
    )
846
    return state_machine_arn, execution_arn
1✔
847

848

849
def create_and_record_events(
1✔
850
    create_state_machine_iam_role,
851
    create_state_machine,
852
    sfn_events_to_sqs_queue,
853
    target_aws_client,
854
    sfn_snapshot,
855
    definition,
856
    execution_input,
857
):
858
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
859
    sfn_snapshot.add_transformers_list(
1✔
860
        [
861
            JsonpathTransformer(
862
                jsonpath="$..detail.startDate",
863
                replacement="start-date",
864
                replace_reference=False,
865
            ),
866
            JsonpathTransformer(
867
                jsonpath="$..detail.stopDate",
868
                replacement="stop-date",
869
                replace_reference=False,
870
            ),
871
            JsonpathTransformer(
872
                jsonpath="$..detail.name",
873
                replacement="test_event_bridge_events-{short_uid()}",
874
                replace_reference=False,
875
            ),
876
        ]
877
    )
878

879
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
880
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
881
        target_aws_client,
882
        name=f"test_event_bridge_events-{short_uid()}",
883
        definition=definition,
884
        roleArn=snf_role_arn,
885
    )
886
    state_machine_arn = create_output["stateMachineArn"]
1✔
887

888
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
889

890
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
891
        stateMachineArn=state_machine_arn, input=execution_input
892
    )
893
    execution_arn = start_execution["executionArn"]
1✔
894
    await_execution_terminated(
1✔
895
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
896
    )
897

898
    stepfunctions_events = []
1✔
899

900
    def _get_events():
1✔
901
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
902
        for message in received.get("Messages", []):
1✔
903
            body = json.loads(message["Body"])
1✔
904
            stepfunctions_events.append(body)
1✔
905
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
906
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
907

908
    poll_condition(_get_events, timeout=60)
1✔
909

910
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
911

912

913
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
914
    stepfunctions_events = []
1✔
915

916
    def _get_events():
1✔
917
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
918
        for message in received.get("Messages", []):
1✔
919
            body = json.loads(message["Body"])
1✔
920
            stepfunctions_events.append(body)
1✔
921
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
922
        return len(stepfunctions_events) == num_events
1✔
923

924
    poll_condition(_get_events, timeout=60)
1✔
925
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", {})))
1✔
926
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
927

928

929
class SfnNoneRecursiveParallelTransformer:
1✔
930
    """
931
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
932
    """
933

934
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
935
        self.events_jsonpath: str = events_jsonpath
1✔
936

937
    @staticmethod
1✔
938
    def _normalise_events(events: list[dict]) -> None:
1✔
939
        start_idx = None
1✔
940
        sublist = []
1✔
941
        in_sublist = False
1✔
942
        for i, event in enumerate(events):
1✔
943
            event_type = event.get("type")
1✔
944
            if event_type is None:
1✔
945
                LOG.debug(
×
946
                    "No 'type' in event item '%s'.",
947
                    event,
948
                )
949
                in_sublist = False
×
950

951
            elif event_type in {
1✔
952
                None,
953
                HistoryEventType.ParallelStateSucceeded,
954
                HistoryEventType.ParallelStateAborted,
955
                HistoryEventType.ParallelStateExited,
956
                HistoryEventType.ParallelStateFailed,
957
            }:
958
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
959
                in_sublist = False
1✔
960
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
961
                in_sublist = True
1✔
962
                sublist = []
1✔
963
                start_idx = i + 1
1✔
964
            elif in_sublist:
1✔
965
                event["id"] = (0,)
1✔
966
                event["previousEventId"] = 0
1✔
967
                sublist.append(event)
1✔
968

969
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
970
        pattern = parse("$..events")
1✔
971
        events = pattern.find(input_data)
1✔
972
        if not events:
1✔
973
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
974
            return input_data
×
975

976
        for events_data in events:
1✔
977
            self._normalise_events(events_data.value)
1✔
978

979
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc