• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / cd952d69-b410-43ae-9e84-3abe1d35489e

23 May 2025 08:30AM UTC coverage: 86.663% (-0.001%) from 86.664%
cd952d69-b410-43ae-9e84-3abe1d35489e

push

circleci

web-flow
Include reserved_concurrent_executions in get_function response (#12654)

3 of 3 new or added lines in 1 file covered. (100.0%)

22 existing lines in 11 files now uncovered.

64552 of 74486 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.21
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from typing import Callable, Final, Optional
1✔
4

5
from botocore.exceptions import ClientError
1✔
6
from jsonpath_ng.ext import parse
1✔
7
from localstack_snapshot.snapshots.transformer import (
1✔
8
    JsonpathTransformer,
9
    RegexTransformer,
10
    TransformContext,
11
)
12

13
from localstack import config
1✔
14
from localstack.aws.api.stepfunctions import (
1✔
15
    Arn,
16
    CloudWatchLogsLogGroup,
17
    CreateStateMachineOutput,
18
    Definition,
19
    ExecutionStatus,
20
    HistoryEventList,
21
    HistoryEventType,
22
    LogDestination,
23
    LoggingConfiguration,
24
    LogLevel,
25
    LongArn,
26
    StateMachineType,
27
)
28
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
29
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
30
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
31
from localstack.testing.aws.util import is_aws_cloud
1✔
32
from localstack.utils.strings import short_uid
1✔
33
from localstack.utils.sync import poll_condition
1✔
34

35
LOG = logging.getLogger(__name__)
1✔
36

37

38
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
39
# Running executions may emit logs after DeleteStateMachine API is called.
40
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
41
_SAMPLING_INTERVAL_SECONDS_AWS_CLOUD: Final[int] = 1
1✔
42
_SAMPLING_INTERVAL_SECONDS_LOCALSTACK: Final[float] = 0.2
1✔
43

44

45
def _get_sampling_interval_seconds() -> int | float:
1✔
46
    return (
1✔
47
        _SAMPLING_INTERVAL_SECONDS_AWS_CLOUD
48
        if is_aws_cloud()
49
        else _SAMPLING_INTERVAL_SECONDS_LOCALSTACK
50
    )
51

52

53
def await_no_state_machines_listed(stepfunctions_client):
1✔
54
    def _is_empty_state_machine_list():
×
55
        lst_resp = stepfunctions_client.list_state_machines()
×
56
        state_machines = lst_resp["stateMachines"]
×
57
        return not bool(state_machines)
×
58

59
    success = poll_condition(
×
60
        condition=_is_empty_state_machine_list,
61
        timeout=_DELETION_TIMEOUT_SECS,
62
        interval=_get_sampling_interval_seconds(),
63
    )
64
    if not success:
×
65
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
66

67

68
def _is_state_machine_alias_listed(
1✔
69
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
70
):
71
    list_state_machine_aliases_list = stepfunctions_client.list_state_machine_aliases(
1✔
72
        stateMachineArn=state_machine_arn
73
    )
74
    state_machine_aliases = list_state_machine_aliases_list["stateMachineAliases"]
1✔
75
    for state_machine_alias in state_machine_aliases:
1✔
76
        if state_machine_alias["stateMachineAliasArn"] == state_machine_alias_arn:
1✔
77
            return True
1✔
78
    return False
1✔
79

80

81
def await_state_machine_alias_is_created(
1✔
82
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
83
):
84
    success = poll_condition(
1✔
85
        condition=lambda: _is_state_machine_alias_listed(
86
            stepfunctions_client=stepfunctions_client,
87
            state_machine_arn=state_machine_arn,
88
            state_machine_alias_arn=state_machine_alias_arn,
89
        ),
90
        timeout=_DELETION_TIMEOUT_SECS,
91
        interval=_get_sampling_interval_seconds(),
92
    )
93
    if not success:
1✔
94
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
95

96

97
def await_state_machine_alias_is_deleted(
1✔
98
    stepfunctions_client, state_machine_arn: Arn, state_machine_alias_arn: Arn
99
):
100
    success = poll_condition(
1✔
101
        condition=lambda: not _is_state_machine_alias_listed(
102
            stepfunctions_client=stepfunctions_client,
103
            state_machine_arn=state_machine_arn,
104
            state_machine_alias_arn=state_machine_alias_arn,
105
        ),
106
        timeout=_DELETION_TIMEOUT_SECS,
107
        interval=_get_sampling_interval_seconds(),
108
    )
109
    if not success:
1✔
110
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
111

112

113
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
114
    lst_resp = stepfunctions_client.list_state_machines()
1✔
115
    state_machines = lst_resp["stateMachines"]
1✔
116
    for state_machine in state_machines:
1✔
117
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
118
            return True
1✔
119
    return False
1✔
120

121

122
def _is_state_machine_version_listed(
1✔
123
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
124
) -> bool:
125
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
126
    versions = lst_resp["stateMachineVersions"]
1✔
127
    for version in versions:
1✔
128
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
129
            return True
1✔
130
    return False
1✔
131

132

133
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
134
    success = poll_condition(
1✔
135
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
136
        timeout=_DELETION_TIMEOUT_SECS,
137
        interval=_get_sampling_interval_seconds(),
138
    )
139
    if not success:
1✔
140
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
141

142

143
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
144
    success = poll_condition(
1✔
145
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
146
        timeout=_DELETION_TIMEOUT_SECS,
147
        interval=_get_sampling_interval_seconds(),
148
    )
149
    if not success:
1✔
150
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
151

152

153
def await_state_machine_version_not_listed(
1✔
154
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
155
):
156
    success = poll_condition(
1✔
157
        condition=lambda: not _is_state_machine_version_listed(
158
            stepfunctions_client, state_machine_arn, state_machine_version_arn
159
        ),
160
        timeout=_DELETION_TIMEOUT_SECS,
161
        interval=_get_sampling_interval_seconds(),
162
    )
163
    if not success:
1✔
164
        LOG.warning(
×
165
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
166
            state_machine_arn,
167
            state_machine_version_arn,
168
        )
169

170

171
def await_state_machine_version_listed(
1✔
172
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
173
):
174
    success = poll_condition(
1✔
175
        condition=lambda: _is_state_machine_version_listed(
176
            stepfunctions_client, state_machine_arn, state_machine_version_arn
177
        ),
178
        timeout=_DELETION_TIMEOUT_SECS,
179
        interval=_get_sampling_interval_seconds(),
180
    )
181
    if not success:
1✔
182
        LOG.warning(
×
183
            "Timed out whilst awaiting for version of %s to include '%s'.",
184
            state_machine_arn,
185
            state_machine_version_arn,
186
        )
187

188

189
def await_on_execution_events(
1✔
190
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
191
) -> HistoryEventList:
192
    events: HistoryEventList = list()
1✔
193

194
    def _run_check():
1✔
195
        nonlocal events
196
        events.clear()
1✔
197
        try:
1✔
198
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
199
        except ClientError:
×
200
            return False
×
201
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
202
        res: bool = check_func(events)
1✔
203
        return res
1✔
204

205
    assert poll_condition(
1✔
206
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
207
    )
208
    return events
1✔
209

210

211
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
212
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
213
        if len(events) > 0:
1✔
214
            last_event = events[-1]
1✔
215
            return "executionSucceededEventDetails" in last_event
1✔
216
        return False
1✔
217

218
    return await_on_execution_events(
1✔
219
        stepfunctions_client=stepfunctions_client,
220
        execution_arn=execution_arn,
221
        check_func=_check_last_is_success,
222
    )
223

224

225
def await_list_execution_status(
1✔
226
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
227
):
228
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
229

230
    def _run_check():
1✔
231
        list_resp = stepfunctions_client.list_executions(
1✔
232
            stateMachineArn=state_machine_arn, statusFilter=status
233
        )
234
        for execution in list_resp.get("executions", []):
1✔
235
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
236
                continue
×
237
            return True
1✔
238
        return False
1✔
239

240
    success = poll_condition(
1✔
241
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
242
    )
243
    if not success:
1✔
244
        LOG.warning(
×
245
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
246
            status,
247
            execution_arn,
248
        )
249

250

251
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
252
    if len(events) > 0:
1✔
253
        last_event = events[-1]
1✔
254
        last_event_type = last_event.get("type")
1✔
255
        return last_event_type is None or last_event_type in {
1✔
256
            HistoryEventType.ExecutionFailed,
257
            HistoryEventType.ExecutionAborted,
258
            HistoryEventType.ExecutionTimedOut,
259
            HistoryEventType.ExecutionSucceeded,
260
        }
261
    return False
1✔
262

263

264
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
265
    return await_on_execution_events(
1✔
266
        stepfunctions_client=stepfunctions_client,
267
        execution_arn=execution_arn,
268
        check_func=_is_last_history_event_terminal,
269
    )
270

271

272
def await_execution_lists_terminated(
1✔
273
    stepfunctions_client, state_machine_arn: str, execution_arn: str
274
):
275
    def _check_last_is_terminal() -> bool:
1✔
276
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
277
        executions = list_output["executions"]
1✔
278
        for execution in executions:
1✔
279
            if execution["executionArn"] == execution_arn:
1✔
280
                return execution["status"] != ExecutionStatus.RUNNING
1✔
281
        return False
×
282

283
    success = poll_condition(
1✔
284
        condition=_check_last_is_terminal, timeout=120, interval=_get_sampling_interval_seconds()
285
    )
286
    if not success:
1✔
287
        LOG.warning(
×
288
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
289
            execution_arn,
290
        )
291

292

293
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
294
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
295
        for event in events:
1✔
296
            return "executionStartedEventDetails" in event
1✔
UNCOV
297
        return False
×
298

299
    return await_on_execution_events(
1✔
300
        stepfunctions_client=stepfunctions_client,
301
        execution_arn=execution_arn,
302
        check_func=_check_stated_exists,
303
    )
304

305

306
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
307
    def _run_check():
1✔
308
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
309
        status: ExecutionStatus = desc_res["status"]
1✔
310
        return status == ExecutionStatus.ABORTED
1✔
311

312
    success = poll_condition(
1✔
313
        condition=_run_check, timeout=120, interval=_get_sampling_interval_seconds()
314
    )
315
    if not success:
1✔
316
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
317

318

319
def get_expected_execution_logs(
1✔
320
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
321
) -> HistoryEventList:
322
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
323
    execution_history_events = execution_history["events"]
1✔
324
    expected_events = [
1✔
325
        event
326
        for event in execution_history_events
327
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
328
    ]
329
    return expected_events
1✔
330

331

332
def is_execution_logs_list_complete(
1✔
333
    expected_events: HistoryEventList,
334
) -> Callable[[HistoryEventList], bool]:
335
    def _validation_function(log_events: list) -> bool:
1✔
336
        if not expected_events:
1✔
337
            return True
×
338
        return len(expected_events) == len(log_events)
1✔
339

340
    return _validation_function
1✔
341

342

343
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
344
    logs_client = target_aws_client.logs
1✔
345
    log_stream_name = str()
1✔
346

347
    def _run_check():
1✔
348
        nonlocal log_stream_name
349
        try:
1✔
350
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
351
                "logStreams"
352
            ]
353
            if not log_streams:
1✔
354
                return False
×
355

356
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
357
            if (
1✔
358
                log_stream_name
359
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
360
            ):
361
                # SFN has not yet create the log stream for the execution, only the validation steam.
362
                return False
1✔
363
            return True
1✔
364
        except ClientError:
×
365
            return False
×
366

367
    assert poll_condition(condition=_run_check)
1✔
368
    return log_stream_name
1✔
369

370

371
def await_on_execution_logs(
1✔
372
    target_aws_client,
373
    log_group_name: str,
374
    validation_function: Callable[[HistoryEventList], bool] = None,
375
) -> HistoryEventList:
376
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
377

378
    logs_client = target_aws_client.logs
1✔
379
    events: HistoryEventList = list()
1✔
380

381
    def _run_check():
1✔
382
        nonlocal events
383
        events.clear()
1✔
384
        try:
1✔
385
            log_events = logs_client.get_log_events(
1✔
386
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
387
            )["events"]
388
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
389
        except ClientError:
×
390
            return False
×
391

392
        res = validation_function(events)
1✔
393
        return res
1✔
394

395
    assert poll_condition(condition=_run_check)
1✔
396
    return events
1✔
397

398

399
def create_state_machine_with_iam_role(
1✔
400
    target_aws_client,
401
    create_state_machine_iam_role,
402
    create_state_machine,
403
    snapshot,
404
    definition: Definition,
405
    logging_configuration: Optional[LoggingConfiguration] = None,
406
    state_machine_name: Optional[str] = None,
407
):
408
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
409
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
410
    snapshot.add_transformer(
1✔
411
        RegexTransformer(
412
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
413
            "Extended Request ID: <extended_request_id>",
414
        )
415
    )
416
    snapshot.add_transformer(
1✔
417
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
418
    )
419

420
    sm_name: str = state_machine_name or f"statemachine_create_and_record_execution_{short_uid()}"
1✔
421
    create_arguments = {
1✔
422
        "name": sm_name,
423
        "definition": definition,
424
        "roleArn": snf_role_arn,
425
    }
426
    if logging_configuration is not None:
1✔
427
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
428
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
429
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
430
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
431
    return state_machine_arn
1✔
432

433

434
def launch_and_record_execution(
1✔
435
    target_aws_client,
436
    sfn_snapshot,
437
    state_machine_arn,
438
    execution_input,
439
    verify_execution_description=False,
440
) -> LongArn:
441
    stepfunctions_client = target_aws_client.stepfunctions
1✔
442
    exec_resp = stepfunctions_client.start_execution(
1✔
443
        stateMachineArn=state_machine_arn, input=execution_input
444
    )
445
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
446
    execution_arn = exec_resp["executionArn"]
1✔
447

448
    await_execution_terminated(
1✔
449
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
450
    )
451

452
    if verify_execution_description:
1✔
453
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
454
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
455

456
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
457

458
    # Transform all map runs if any.
459
    try:
1✔
460
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
461
        if isinstance(map_run_arns, str):
1✔
462
            map_run_arns = [map_run_arns]
1✔
463
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
464
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
465
    except NoSuchJsonPathError:
1✔
466
        # No mapRunArns
467
        pass
1✔
468

469
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
470

471
    return execution_arn
1✔
472

473

474
def launch_and_record_mocked_execution(
1✔
475
    target_aws_client,
476
    sfn_snapshot,
477
    state_machine_arn,
478
    execution_input,
479
    test_name,
480
) -> LongArn:
481
    stepfunctions_client = target_aws_client.stepfunctions
1✔
482
    exec_resp = stepfunctions_client.start_execution(
1✔
483
        stateMachineArn=f"{state_machine_arn}#{test_name}", input=execution_input
484
    )
485
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
486
    execution_arn = exec_resp["executionArn"]
1✔
487

488
    await_execution_terminated(
1✔
489
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
490
    )
491

492
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
493

494
    # Transform all map runs if any.
495
    try:
1✔
496
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
497
        if isinstance(map_run_arns, str):
1✔
498
            map_run_arns = [map_run_arns]
×
499
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
500
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
501
    except NoSuchJsonPathError:
1✔
502
        # No mapRunArns
503
        pass
1✔
504

505
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
506

507
    return execution_arn
1✔
508

509

510
def launch_and_record_logs(
1✔
511
    target_aws_client,
512
    state_machine_arn,
513
    execution_input,
514
    log_level,
515
    log_group_name,
516
    sfn_snapshot,
517
):
518
    execution_arn = launch_and_record_execution(
1✔
519
        target_aws_client,
520
        sfn_snapshot,
521
        state_machine_arn,
522
        execution_input,
523
    )
524
    expected_events = get_expected_execution_logs(
1✔
525
        target_aws_client.stepfunctions, log_level, execution_arn
526
    )
527

528
    if log_level == LogLevel.OFF or not expected_events:
1✔
529
        # The test should terminate here, as no log streams for this execution would have been created.
530
        return
1✔
531

532
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
533
    logged_execution_events = await_on_execution_logs(
1✔
534
        target_aws_client, log_group_name, logs_validation_function
535
    )
536

537
    sfn_snapshot.add_transformer(
1✔
538
        JsonpathTransformer(
539
            jsonpath="$..event_timestamp",
540
            replacement="timestamp",
541
            replace_reference=False,
542
        )
543
    )
544
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
545

546

547
def create_and_record_execution(
1✔
548
    target_aws_client,
549
    create_state_machine_iam_role,
550
    create_state_machine,
551
    sfn_snapshot,
552
    definition,
553
    execution_input,
554
    verify_execution_description=False,
555
) -> LongArn:
556
    state_machine_arn = create_state_machine_with_iam_role(
1✔
557
        target_aws_client,
558
        create_state_machine_iam_role,
559
        create_state_machine,
560
        sfn_snapshot,
561
        definition,
562
    )
563
    exeuction_arn = launch_and_record_execution(
1✔
564
        target_aws_client,
565
        sfn_snapshot,
566
        state_machine_arn,
567
        execution_input,
568
        verify_execution_description,
569
    )
570
    return exeuction_arn
1✔
571

572

573
def create_and_record_mocked_execution(
1✔
574
    target_aws_client,
575
    create_state_machine_iam_role,
576
    create_state_machine,
577
    sfn_snapshot,
578
    definition,
579
    execution_input,
580
    state_machine_name,
581
    test_name,
582
) -> LongArn:
583
    state_machine_arn = create_state_machine_with_iam_role(
1✔
584
        target_aws_client,
585
        create_state_machine_iam_role,
586
        create_state_machine,
587
        sfn_snapshot,
588
        definition,
589
        state_machine_name=state_machine_name,
590
    )
591
    execution_arn = launch_and_record_mocked_execution(
1✔
592
        target_aws_client, sfn_snapshot, state_machine_arn, execution_input, test_name
593
    )
594
    return execution_arn
1✔
595

596

597
def create_and_run_mock(
1✔
598
    target_aws_client,
599
    monkeypatch,
600
    mock_config_file,
601
    mock_config: dict,
602
    state_machine_name: str,
603
    definition_template: dict,
604
    execution_input: str,
605
    test_name: str,
606
):
607
    mock_config_file_path = mock_config_file(mock_config)
1✔
608
    monkeypatch.setattr(config, "SFN_MOCK_CONFIG", mock_config_file_path)
1✔
609

610
    sfn_client = target_aws_client.stepfunctions
1✔
611

612
    state_machine_name: str = state_machine_name or f"mocked_statemachine_{short_uid()}"
1✔
613
    definition = json.dumps(definition_template)
1✔
614
    creation_response = sfn_client.create_state_machine(
1✔
615
        name=state_machine_name,
616
        definition=definition,
617
        roleArn="arn:aws:iam::111111111111:role/mock-role/mocked-run",
618
    )
619
    state_machine_arn = creation_response["stateMachineArn"]
1✔
620

621
    test_case_arn = f"{state_machine_arn}#{test_name}"
1✔
622
    execution = sfn_client.start_execution(stateMachineArn=test_case_arn, input=execution_input)
1✔
623
    execution_arn = execution["executionArn"]
1✔
624

625
    await_execution_terminated(stepfunctions_client=sfn_client, execution_arn=execution_arn)
1✔
626
    sfn_client.delete_state_machine(stateMachineArn=state_machine_arn)
1✔
627

628
    return execution_arn
1✔
629

630

631
def create_and_record_logs(
1✔
632
    target_aws_client,
633
    create_state_machine_iam_role,
634
    create_state_machine,
635
    sfn_create_log_group,
636
    sfn_snapshot,
637
    definition,
638
    execution_input,
639
    log_level: LogLevel,
640
    include_execution_data: bool,
641
):
642
    state_machine_arn = create_state_machine_with_iam_role(
1✔
643
        target_aws_client,
644
        create_state_machine_iam_role,
645
        create_state_machine,
646
        sfn_snapshot,
647
        definition,
648
    )
649

650
    log_group_name = sfn_create_log_group()
1✔
651
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
652
        "logGroups"
653
    ][0]["arn"]
654
    logging_configuration = LoggingConfiguration(
1✔
655
        level=log_level,
656
        includeExecutionData=include_execution_data,
657
        destinations=[
658
            LogDestination(
659
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
660
            ),
661
        ],
662
    )
663
    target_aws_client.stepfunctions.update_state_machine(
1✔
664
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
665
    )
666

667
    launch_and_record_logs(
1✔
668
        target_aws_client,
669
        state_machine_arn,
670
        execution_input,
671
        log_level,
672
        log_group_name,
673
        sfn_snapshot,
674
    )
675

676

677
def launch_and_record_sync_execution(
1✔
678
    target_aws_client,
679
    sfn_snapshot,
680
    state_machine_arn,
681
    execution_input,
682
):
683
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
684
        stateMachineArn=state_machine_arn,
685
        input=execution_input,
686
    )
687
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
688
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
689

690

691
def create_and_record_express_sync_execution(
1✔
692
    target_aws_client,
693
    create_state_machine_iam_role,
694
    create_state_machine,
695
    sfn_snapshot,
696
    definition,
697
    execution_input,
698
):
699
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
700
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
701

702
    creation_response = create_state_machine(
1✔
703
        target_aws_client,
704
        name=f"express_statemachine_{short_uid()}",
705
        definition=definition,
706
        roleArn=snf_role_arn,
707
        type=StateMachineType.EXPRESS,
708
    )
709
    state_machine_arn = creation_response["stateMachineArn"]
1✔
710
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
711
    sfn_snapshot.match("creation_response", creation_response)
1✔
712

713
    launch_and_record_sync_execution(
1✔
714
        target_aws_client,
715
        sfn_snapshot,
716
        state_machine_arn,
717
        execution_input,
718
    )
719

720

721
def launch_and_record_express_async_execution(
1✔
722
    target_aws_client,
723
    sfn_snapshot,
724
    state_machine_arn,
725
    log_group_name,
726
    execution_input,
727
):
728
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
729
        stateMachineArn=state_machine_arn, input=execution_input
730
    )
731
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
732
    execution_arn = start_execution["executionArn"]
1✔
733

734
    event_list = await_on_execution_logs(
1✔
735
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
736
    )
737
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
738
    end_event = event_list[-1]
1✔
739
    sfn_snapshot.match("end_event", end_event)
1✔
740

741
    return execution_arn
1✔
742

743

744
def create_and_record_express_async_execution(
1✔
745
    target_aws_client,
746
    create_state_machine_iam_role,
747
    create_state_machine,
748
    sfn_create_log_group,
749
    sfn_snapshot,
750
    definition,
751
    execution_input,
752
    include_execution_data: bool = True,
753
) -> tuple[LongArn, LongArn]:
754
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
755
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
756

757
    log_group_name = sfn_create_log_group()
1✔
758
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
759
        "logGroups"
760
    ][0]["arn"]
761
    logging_configuration = LoggingConfiguration(
1✔
762
        level=LogLevel.ALL,
763
        includeExecutionData=include_execution_data,
764
        destinations=[
765
            LogDestination(
766
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
767
            ),
768
        ],
769
    )
770

771
    creation_response = create_state_machine(
1✔
772
        target_aws_client,
773
        name=f"express_statemachine_{short_uid()}",
774
        definition=definition,
775
        roleArn=snf_role_arn,
776
        type=StateMachineType.EXPRESS,
777
        loggingConfiguration=logging_configuration,
778
    )
779
    state_machine_arn = creation_response["stateMachineArn"]
1✔
780
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
781
    sfn_snapshot.match("creation_response", creation_response)
1✔
782

783
    execution_arn = launch_and_record_express_async_execution(
1✔
784
        target_aws_client,
785
        sfn_snapshot,
786
        state_machine_arn,
787
        log_group_name,
788
        execution_input,
789
    )
790
    return state_machine_arn, execution_arn
1✔
791

792

793
def create_and_record_events(
1✔
794
    create_state_machine_iam_role,
795
    create_state_machine,
796
    sfn_events_to_sqs_queue,
797
    target_aws_client,
798
    sfn_snapshot,
799
    definition,
800
    execution_input,
801
):
802
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
803
    sfn_snapshot.add_transformers_list(
1✔
804
        [
805
            JsonpathTransformer(
806
                jsonpath="$..detail.startDate",
807
                replacement="start-date",
808
                replace_reference=False,
809
            ),
810
            JsonpathTransformer(
811
                jsonpath="$..detail.stopDate",
812
                replacement="stop-date",
813
                replace_reference=False,
814
            ),
815
            JsonpathTransformer(
816
                jsonpath="$..detail.name",
817
                replacement="test_event_bridge_events-{short_uid()}",
818
                replace_reference=False,
819
            ),
820
        ]
821
    )
822

823
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
824
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
825
        target_aws_client,
826
        name=f"test_event_bridge_events-{short_uid()}",
827
        definition=definition,
828
        roleArn=snf_role_arn,
829
    )
830
    state_machine_arn = create_output["stateMachineArn"]
1✔
831

832
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
833

834
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
835
        stateMachineArn=state_machine_arn, input=execution_input
836
    )
837
    execution_arn = start_execution["executionArn"]
1✔
838
    await_execution_terminated(
1✔
839
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
840
    )
841

842
    stepfunctions_events = list()
1✔
843

844
    def _get_events():
1✔
845
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
846
        for message in received.get("Messages", []):
1✔
847
            body = json.loads(message["Body"])
1✔
848
            stepfunctions_events.append(body)
1✔
849
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
850
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
851

852
    poll_condition(_get_events, timeout=60)
1✔
853

854
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
855

856

857
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
858
    stepfunctions_events = list()
1✔
859

860
    def _get_events():
1✔
861
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
862
        for message in received.get("Messages", []):
1✔
863
            body = json.loads(message["Body"])
1✔
864
            stepfunctions_events.append(body)
1✔
865
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
866
        return len(stepfunctions_events) == num_events
1✔
867

868
    poll_condition(_get_events, timeout=60)
1✔
869
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", dict())))
1✔
870
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
871

872

873
class SfnNoneRecursiveParallelTransformer:
1✔
874
    """
875
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
876
    """
877

878
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
879
        self.events_jsonpath: str = events_jsonpath
1✔
880

881
    @staticmethod
1✔
882
    def _normalise_events(events: list[dict]) -> None:
1✔
883
        start_idx = None
1✔
884
        sublist = list()
1✔
885
        in_sublist = False
1✔
886
        for i, event in enumerate(events):
1✔
887
            event_type = event.get("type")
1✔
888
            if event_type is None:
1✔
889
                LOG.debug(
×
890
                    "No 'type' in event item '%s'.",
891
                    event,
892
                )
893
                in_sublist = False
×
894

895
            elif event_type in {
1✔
896
                None,
897
                HistoryEventType.ParallelStateSucceeded,
898
                HistoryEventType.ParallelStateAborted,
899
                HistoryEventType.ParallelStateExited,
900
                HistoryEventType.ParallelStateFailed,
901
            }:
902
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
903
                in_sublist = False
1✔
904
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
905
                in_sublist = True
1✔
906
                sublist = []
1✔
907
                start_idx = i + 1
1✔
908
            elif in_sublist:
1✔
909
                event["id"] = (0,)
1✔
910
                event["previousEventId"] = 0
1✔
911
                sublist.append(event)
1✔
912

913
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
914
        pattern = parse("$..events")
1✔
915
        events = pattern.find(input_data)
1✔
916
        if not events:
1✔
917
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
918
            return input_data
×
919

920
        for events_data in events:
1✔
921
            self._normalise_events(events_data.value)
1✔
922

923
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc