• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 2a469994-6277-4cb6-9b01-d09563cb7cbb

03 Mar 2025 10:41PM UTC coverage: 86.862% (-0.03%) from 86.891%
2a469994-6277-4cb6-9b01-d09563cb7cbb

push

circleci

web-flow
APIGW: add validation for AWS ARN in PutIntegration (#12324)

11 of 13 new or added lines in 1 file covered. (84.62%)

55 existing lines in 12 files now uncovered.

61890 of 71251 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.55
/localstack-core/localstack/testing/pytest/stepfunctions/utils.py
1
import json
1✔
2
import logging
1✔
3
from typing import Callable, Final, Optional
1✔
4

5
from botocore.exceptions import ClientError
1✔
6
from jsonpath_ng.ext import parse
1✔
7
from localstack_snapshot.snapshots.transformer import (
1✔
8
    JsonpathTransformer,
9
    RegexTransformer,
10
    TransformContext,
11
)
12

13
from localstack.aws.api.stepfunctions import (
1✔
14
    CloudWatchLogsLogGroup,
15
    CreateStateMachineOutput,
16
    Definition,
17
    ExecutionStatus,
18
    HistoryEventList,
19
    HistoryEventType,
20
    LogDestination,
21
    LoggingConfiguration,
22
    LogLevel,
23
    LongArn,
24
    StateMachineType,
25
)
26
from localstack.services.stepfunctions.asl.eval.event.logging import is_logging_enabled_for
1✔
27
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
28
from localstack.services.stepfunctions.asl.utils.json_path import NoSuchJsonPathError, extract_json
1✔
29
from localstack.utils.strings import short_uid
1✔
30
from localstack.utils.sync import poll_condition
1✔
31

32
LOG = logging.getLogger(__name__)
1✔
33

34

35
# For EXPRESS state machines, the deletion will happen eventually (usually less than a minute).
36
# Running executions may emit logs after DeleteStateMachine API is called.
37
_DELETION_TIMEOUT_SECS: Final[int] = 120
1✔
38

39

40
def await_no_state_machines_listed(stepfunctions_client):
1✔
41
    def _is_empty_state_machine_list():
×
42
        lst_resp = stepfunctions_client.list_state_machines()
×
43
        state_machines = lst_resp["stateMachines"]
×
44
        return not bool(state_machines)
×
45

46
    success = poll_condition(
×
47
        condition=_is_empty_state_machine_list,
48
        timeout=_DELETION_TIMEOUT_SECS,
49
        interval=1,
50
    )
51
    if not success:
×
52
        LOG.warning("Timed out whilst awaiting for listing to be empty.")
×
53

54

55
def _is_state_machine_listed(stepfunctions_client, state_machine_arn: str) -> bool:
1✔
56
    lst_resp = stepfunctions_client.list_state_machines()
1✔
57
    state_machines = lst_resp["stateMachines"]
1✔
58
    for state_machine in state_machines:
1✔
59
        if state_machine["stateMachineArn"] == state_machine_arn:
1✔
60
            return True
1✔
61
    return False
1✔
62

63

64
def _is_state_machine_version_listed(
1✔
65
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
66
) -> bool:
67
    lst_resp = stepfunctions_client.list_state_machine_versions(stateMachineArn=state_machine_arn)
1✔
68
    versions = lst_resp["stateMachineVersions"]
1✔
69
    for version in versions:
1✔
70
        if version["stateMachineVersionArn"] == state_machine_version_arn:
1✔
71
            return True
1✔
72
    return False
1✔
73

74

75
def await_state_machine_not_listed(stepfunctions_client, state_machine_arn: str):
1✔
76
    success = poll_condition(
1✔
77
        condition=lambda: not _is_state_machine_listed(stepfunctions_client, state_machine_arn),
78
        timeout=_DELETION_TIMEOUT_SECS,
79
        interval=1,
80
    )
81
    if not success:
1✔
82
        LOG.warning("Timed out whilst awaiting for listing to exclude '%s'.", state_machine_arn)
×
83

84

85
def await_state_machine_listed(stepfunctions_client, state_machine_arn: str):
1✔
86
    success = poll_condition(
1✔
87
        condition=lambda: _is_state_machine_listed(stepfunctions_client, state_machine_arn),
88
        timeout=_DELETION_TIMEOUT_SECS,
89
        interval=1,
90
    )
91
    if not success:
1✔
92
        LOG.warning("Timed out whilst awaiting for listing to include '%s'.", state_machine_arn)
×
93

94

95
def await_state_machine_version_not_listed(
1✔
96
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
97
):
98
    success = poll_condition(
1✔
99
        condition=lambda: not _is_state_machine_version_listed(
100
            stepfunctions_client, state_machine_arn, state_machine_version_arn
101
        ),
102
        timeout=_DELETION_TIMEOUT_SECS,
103
        interval=1,
104
    )
105
    if not success:
1✔
106
        LOG.warning(
×
107
            "Timed out whilst awaiting for version of %s to exclude '%s'.",
108
            state_machine_arn,
109
            state_machine_version_arn,
110
        )
111

112

113
def await_state_machine_version_listed(
1✔
114
    stepfunctions_client, state_machine_arn: str, state_machine_version_arn: str
115
):
116
    success = poll_condition(
1✔
117
        condition=lambda: _is_state_machine_version_listed(
118
            stepfunctions_client, state_machine_arn, state_machine_version_arn
119
        ),
120
        timeout=_DELETION_TIMEOUT_SECS,
121
        interval=1,
122
    )
123
    if not success:
1✔
124
        LOG.warning(
×
125
            "Timed out whilst awaiting for version of %s to include '%s'.",
126
            state_machine_arn,
127
            state_machine_version_arn,
128
        )
129

130

131
def await_on_execution_events(
1✔
132
    stepfunctions_client, execution_arn: str, check_func: Callable[[HistoryEventList], bool]
133
) -> HistoryEventList:
134
    events: HistoryEventList = list()
1✔
135

136
    def _run_check():
1✔
137
        nonlocal events
138
        events.clear()
1✔
139
        try:
1✔
140
            hist_resp = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
141
        except ClientError:
×
142
            return False
×
143
        events.extend(sorted(hist_resp.get("events", []), key=lambda event: event.get("timestamp")))
1✔
144
        res: bool = check_func(events)
1✔
145
        return res
1✔
146

147
    assert poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
148
    return events
1✔
149

150

151
def await_execution_success(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
152
    def _check_last_is_success(events: HistoryEventList) -> bool:
1✔
153
        if len(events) > 0:
1✔
154
            last_event = events[-1]
1✔
155
            return "executionSucceededEventDetails" in last_event
1✔
156
        return False
1✔
157

158
    return await_on_execution_events(
1✔
159
        stepfunctions_client=stepfunctions_client,
160
        execution_arn=execution_arn,
161
        check_func=_check_last_is_success,
162
    )
163

164

165
def await_list_execution_status(
1✔
166
    stepfunctions_client, state_machine_arn: str, execution_arn: str, status: str
167
):
168
    """required as there is some eventual consistency in list_executions vs describe_execution and get_execution_history"""
169

170
    def _run_check():
1✔
171
        list_resp = stepfunctions_client.list_executions(
1✔
172
            stateMachineArn=state_machine_arn, statusFilter=status
173
        )
174
        for execution in list_resp.get("executions", []):
1✔
175
            if execution["executionArn"] != execution_arn or execution["status"] != status:
1✔
176
                continue
×
177
            return True
1✔
178
        return False
1✔
179

180
    success = poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
181
    if not success:
1✔
182
        LOG.warning(
×
183
            "Timed out whilst awaiting for execution status %s to satisfy condition for execution '%s'.",
184
            status,
185
            execution_arn,
186
        )
187

188

189
def _is_last_history_event_terminal(events: HistoryEventList) -> bool:
1✔
190
    if len(events) > 0:
1✔
191
        last_event = events[-1]
1✔
192
        last_event_type = last_event.get("type")
1✔
193
        return last_event_type is None or last_event_type in {
1✔
194
            HistoryEventType.ExecutionFailed,
195
            HistoryEventType.ExecutionAborted,
196
            HistoryEventType.ExecutionTimedOut,
197
            HistoryEventType.ExecutionSucceeded,
198
        }
199
    return False
1✔
200

201

202
def await_execution_terminated(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
203
    return await_on_execution_events(
1✔
204
        stepfunctions_client=stepfunctions_client,
205
        execution_arn=execution_arn,
206
        check_func=_is_last_history_event_terminal,
207
    )
208

209

210
def await_execution_lists_terminated(
1✔
211
    stepfunctions_client, state_machine_arn: str, execution_arn: str
212
):
213
    def _check_last_is_terminal() -> bool:
1✔
214
        list_output = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)
1✔
215
        executions = list_output["executions"]
1✔
216
        for execution in executions:
1✔
217
            if execution["executionArn"] == execution_arn:
1✔
218
                return execution["status"] != ExecutionStatus.RUNNING
1✔
219
        return False
×
220

221
    success = poll_condition(condition=_check_last_is_terminal, timeout=120, interval=1)
1✔
222
    if not success:
1✔
223
        LOG.warning(
×
224
            "Timed out whilst awaiting for execution events to satisfy condition for execution '%s'.",
225
            execution_arn,
226
        )
227

228

229
def await_execution_started(stepfunctions_client, execution_arn: str) -> HistoryEventList:
1✔
230
    def _check_stated_exists(events: HistoryEventList) -> bool:
1✔
231
        for event in events:
1✔
232
            return "executionStartedEventDetails" in event
1✔
UNCOV
233
        return False
×
234

235
    return await_on_execution_events(
1✔
236
        stepfunctions_client=stepfunctions_client,
237
        execution_arn=execution_arn,
238
        check_func=_check_stated_exists,
239
    )
240

241

242
def await_execution_aborted(stepfunctions_client, execution_arn: str):
1✔
243
    def _run_check():
1✔
244
        desc_res = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
245
        status: ExecutionStatus = desc_res["status"]
1✔
246
        return status == ExecutionStatus.ABORTED
1✔
247

248
    success = poll_condition(condition=_run_check, timeout=120, interval=1)
1✔
249
    if not success:
1✔
250
        LOG.warning("Timed out whilst awaiting for execution '%s' to abort.", execution_arn)
×
251

252

253
def get_expected_execution_logs(
1✔
254
    stepfunctions_client, log_level: LogLevel, execution_arn: LongArn
255
) -> HistoryEventList:
256
    execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
257
    execution_history_events = execution_history["events"]
1✔
258
    expected_events = [
1✔
259
        event
260
        for event in execution_history_events
261
        if is_logging_enabled_for(log_level=log_level, history_event_type=event["type"])
262
    ]
263
    return expected_events
1✔
264

265

266
def is_execution_logs_list_complete(
1✔
267
    expected_events: HistoryEventList,
268
) -> Callable[[HistoryEventList], bool]:
269
    def _validation_function(log_events: list) -> bool:
1✔
270
        if not expected_events:
1✔
271
            return True
×
272
        return len(expected_events) == len(log_events)
1✔
273

274
    return _validation_function
1✔
275

276

277
def _await_on_execution_log_stream_created(target_aws_client, log_group_name: str) -> str:
1✔
278
    logs_client = target_aws_client.logs
1✔
279
    log_stream_name = str()
1✔
280

281
    def _run_check():
1✔
282
        nonlocal log_stream_name
283
        try:
1✔
284
            log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)[
1✔
285
                "logStreams"
286
            ]
287
            if not log_streams:
1✔
288
                return False
×
289

290
            log_stream_name = log_streams[-1]["logStreamName"]
1✔
291
            if (
1✔
292
                log_stream_name
293
                == "log_stream_created_by_aws_to_validate_log_delivery_subscriptions"
294
            ):
295
                # SFN has not yet create the log stream for the execution, only the validation steam.
296
                return False
1✔
297
            return True
1✔
298
        except ClientError:
×
299
            return False
×
300

301
    assert poll_condition(condition=_run_check)
1✔
302
    return log_stream_name
1✔
303

304

305
def await_on_execution_logs(
1✔
306
    target_aws_client,
307
    log_group_name: str,
308
    validation_function: Callable[[HistoryEventList], bool] = None,
309
) -> HistoryEventList:
310
    log_stream_name = _await_on_execution_log_stream_created(target_aws_client, log_group_name)
1✔
311

312
    logs_client = target_aws_client.logs
1✔
313
    events: HistoryEventList = list()
1✔
314

315
    def _run_check():
1✔
316
        nonlocal events
317
        events.clear()
1✔
318
        try:
1✔
319
            log_events = logs_client.get_log_events(
1✔
320
                logGroupName=log_group_name, logStreamName=log_stream_name, startFromHead=True
321
            )["events"]
322
            events.extend([json.loads(e["message"]) for e in log_events])
1✔
323
        except ClientError:
×
324
            return False
×
325

326
        res = validation_function(events)
1✔
327
        return res
1✔
328

329
    assert poll_condition(condition=_run_check)
1✔
330
    return events
1✔
331

332

333
def create_state_machine_with_iam_role(
1✔
334
    target_aws_client,
335
    create_state_machine_iam_role,
336
    create_state_machine,
337
    snapshot,
338
    definition: Definition,
339
    logging_configuration: Optional[LoggingConfiguration] = None,
340
):
341
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
342
    snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
1✔
343
    snapshot.add_transformer(
1✔
344
        RegexTransformer(
345
            "Extended Request ID: [a-zA-Z0-9-/=+]+",
346
            "Extended Request ID: <extended_request_id>",
347
        )
348
    )
349
    snapshot.add_transformer(
1✔
350
        RegexTransformer("Request ID: [a-zA-Z0-9-]+", "Request ID: <request_id>")
351
    )
352

353
    sm_name: str = f"statemachine_create_and_record_execution_{short_uid()}"
1✔
354
    create_arguments = {
1✔
355
        "name": sm_name,
356
        "definition": definition,
357
        "roleArn": snf_role_arn,
358
    }
359
    if logging_configuration is not None:
1✔
360
        create_arguments["loggingConfiguration"] = logging_configuration
1✔
361
    creation_resp = create_state_machine(target_aws_client, **create_arguments)
1✔
362
    snapshot.add_transformer(snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
1✔
363
    state_machine_arn = creation_resp["stateMachineArn"]
1✔
364
    return state_machine_arn
1✔
365

366

367
def launch_and_record_execution(
1✔
368
    target_aws_client,
369
    sfn_snapshot,
370
    state_machine_arn,
371
    execution_input,
372
    verify_execution_description=False,
373
) -> LongArn:
374
    stepfunctions_client = target_aws_client.stepfunctions
1✔
375
    exec_resp = stepfunctions_client.start_execution(
1✔
376
        stateMachineArn=state_machine_arn, input=execution_input
377
    )
378
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
1✔
379
    execution_arn = exec_resp["executionArn"]
1✔
380

381
    await_execution_terminated(
1✔
382
        stepfunctions_client=stepfunctions_client, execution_arn=execution_arn
383
    )
384

385
    if verify_execution_description:
1✔
386
        describe_execution = stepfunctions_client.describe_execution(executionArn=execution_arn)
1✔
387
        sfn_snapshot.match("describe_execution", describe_execution)
1✔
388

389
    get_execution_history = stepfunctions_client.get_execution_history(executionArn=execution_arn)
1✔
390

391
    # Transform all map runs if any.
392
    try:
1✔
393
        map_run_arns = extract_json("$..mapRunArn", get_execution_history)
1✔
394
        if isinstance(map_run_arns, str):
1✔
395
            map_run_arns = [map_run_arns]
1✔
396
        for i, map_run_arn in enumerate(list(set(map_run_arns))):
1✔
397
            sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_map_run_arn(map_run_arn, i))
1✔
398
    except NoSuchJsonPathError:
1✔
399
        # No mapRunArns
400
        pass
1✔
401

402
    sfn_snapshot.match("get_execution_history", get_execution_history)
1✔
403

404
    return execution_arn
1✔
405

406

407
def launch_and_record_logs(
1✔
408
    target_aws_client,
409
    state_machine_arn,
410
    execution_input,
411
    log_level,
412
    log_group_name,
413
    sfn_snapshot,
414
):
415
    execution_arn = launch_and_record_execution(
1✔
416
        target_aws_client,
417
        sfn_snapshot,
418
        state_machine_arn,
419
        execution_input,
420
    )
421
    expected_events = get_expected_execution_logs(
1✔
422
        target_aws_client.stepfunctions, log_level, execution_arn
423
    )
424

425
    if log_level == LogLevel.OFF or not expected_events:
1✔
426
        # The test should terminate here, as no log streams for this execution would have been created.
427
        return
1✔
428

429
    logs_validation_function = is_execution_logs_list_complete(expected_events)
1✔
430
    logged_execution_events = await_on_execution_logs(
1✔
431
        target_aws_client, log_group_name, logs_validation_function
432
    )
433

434
    sfn_snapshot.add_transformer(
1✔
435
        JsonpathTransformer(
436
            jsonpath="$..event_timestamp",
437
            replacement="timestamp",
438
            replace_reference=False,
439
        )
440
    )
441
    sfn_snapshot.match("logged_execution_events", logged_execution_events)
1✔
442

443

444
# TODO: make this return the execution ARN for manual assertions
445
def create_and_record_execution(
1✔
446
    target_aws_client,
447
    create_state_machine_iam_role,
448
    create_state_machine,
449
    sfn_snapshot,
450
    definition,
451
    execution_input,
452
    verify_execution_description=False,
453
):
454
    state_machine_arn = create_state_machine_with_iam_role(
1✔
455
        target_aws_client,
456
        create_state_machine_iam_role,
457
        create_state_machine,
458
        sfn_snapshot,
459
        definition,
460
    )
461
    launch_and_record_execution(
1✔
462
        target_aws_client,
463
        sfn_snapshot,
464
        state_machine_arn,
465
        execution_input,
466
        verify_execution_description,
467
    )
468

469

470
def create_and_record_logs(
1✔
471
    target_aws_client,
472
    create_state_machine_iam_role,
473
    create_state_machine,
474
    sfn_create_log_group,
475
    sfn_snapshot,
476
    definition,
477
    execution_input,
478
    log_level: LogLevel,
479
    include_execution_data: bool,
480
):
481
    state_machine_arn = create_state_machine_with_iam_role(
1✔
482
        target_aws_client,
483
        create_state_machine_iam_role,
484
        create_state_machine,
485
        sfn_snapshot,
486
        definition,
487
    )
488

489
    log_group_name = sfn_create_log_group()
1✔
490
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
491
        "logGroups"
492
    ][0]["arn"]
493
    logging_configuration = LoggingConfiguration(
1✔
494
        level=log_level,
495
        includeExecutionData=include_execution_data,
496
        destinations=[
497
            LogDestination(
498
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
499
            ),
500
        ],
501
    )
502
    target_aws_client.stepfunctions.update_state_machine(
1✔
503
        stateMachineArn=state_machine_arn, loggingConfiguration=logging_configuration
504
    )
505

506
    launch_and_record_logs(
1✔
507
        target_aws_client,
508
        state_machine_arn,
509
        execution_input,
510
        log_level,
511
        log_group_name,
512
        sfn_snapshot,
513
    )
514

515

516
def launch_and_record_sync_execution(
1✔
517
    target_aws_client,
518
    sfn_snapshot,
519
    state_machine_arn,
520
    execution_input,
521
):
522
    exec_resp = target_aws_client.stepfunctions.start_sync_execution(
1✔
523
        stateMachineArn=state_machine_arn,
524
        input=execution_input,
525
    )
526
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_sync_exec_arn(exec_resp, 0))
1✔
527
    sfn_snapshot.match("start_execution_sync_response", exec_resp)
1✔
528

529

530
def create_and_record_express_sync_execution(
1✔
531
    target_aws_client,
532
    create_state_machine_iam_role,
533
    create_state_machine,
534
    sfn_snapshot,
535
    definition,
536
    execution_input,
537
):
538
    snf_role_arn = create_state_machine_iam_role(target_aws_client=target_aws_client)
1✔
539
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
540

541
    creation_response = create_state_machine(
1✔
542
        target_aws_client,
543
        name=f"express_statemachine_{short_uid()}",
544
        definition=definition,
545
        roleArn=snf_role_arn,
546
        type=StateMachineType.EXPRESS,
547
    )
548
    state_machine_arn = creation_response["stateMachineArn"]
1✔
549
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
550
    sfn_snapshot.match("creation_response", creation_response)
1✔
551

552
    launch_and_record_sync_execution(
1✔
553
        target_aws_client,
554
        sfn_snapshot,
555
        state_machine_arn,
556
        execution_input,
557
    )
558

559

560
def launch_and_record_express_async_execution(
1✔
561
    target_aws_client,
562
    sfn_snapshot,
563
    state_machine_arn,
564
    log_group_name,
565
    execution_input,
566
):
567
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
568
        stateMachineArn=state_machine_arn, input=execution_input
569
    )
570
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_express_exec_arn(start_execution, 0))
1✔
571
    execution_arn = start_execution["executionArn"]
1✔
572

573
    event_list = await_on_execution_logs(
1✔
574
        target_aws_client, log_group_name, validation_function=_is_last_history_event_terminal
575
    )
576
    # Snapshot only the end event, as AWS StepFunctions implements a flaky approach to logging previous events.
577
    end_event = event_list[-1]
1✔
578
    sfn_snapshot.match("end_event", end_event)
1✔
579

580
    return execution_arn
1✔
581

582

583
def create_and_record_express_async_execution(
1✔
584
    target_aws_client,
585
    create_state_machine_iam_role,
586
    create_state_machine,
587
    sfn_create_log_group,
588
    sfn_snapshot,
589
    definition,
590
    execution_input,
591
    include_execution_data: bool = True,
592
) -> tuple[LongArn, LongArn]:
593
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
594
    sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "sfn_role_arn"))
1✔
595

596
    log_group_name = sfn_create_log_group()
1✔
597
    log_group_arn = target_aws_client.logs.describe_log_groups(logGroupNamePrefix=log_group_name)[
1✔
598
        "logGroups"
599
    ][0]["arn"]
600
    logging_configuration = LoggingConfiguration(
1✔
601
        level=LogLevel.ALL,
602
        includeExecutionData=include_execution_data,
603
        destinations=[
604
            LogDestination(
605
                cloudWatchLogsLogGroup=CloudWatchLogsLogGroup(logGroupArn=log_group_arn)
606
            ),
607
        ],
608
    )
609

610
    creation_response = create_state_machine(
1✔
611
        target_aws_client,
612
        name=f"express_statemachine_{short_uid()}",
613
        definition=definition,
614
        roleArn=snf_role_arn,
615
        type=StateMachineType.EXPRESS,
616
        loggingConfiguration=logging_configuration,
617
    )
618
    state_machine_arn = creation_response["stateMachineArn"]
1✔
619
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_response, 0))
1✔
620
    sfn_snapshot.match("creation_response", creation_response)
1✔
621

622
    execution_arn = launch_and_record_express_async_execution(
1✔
623
        target_aws_client,
624
        sfn_snapshot,
625
        state_machine_arn,
626
        log_group_name,
627
        execution_input,
628
    )
629
    return state_machine_arn, execution_arn
1✔
630

631

632
def create_and_record_events(
1✔
633
    create_state_machine_iam_role,
634
    create_state_machine,
635
    sfn_events_to_sqs_queue,
636
    target_aws_client,
637
    sfn_snapshot,
638
    definition,
639
    execution_input,
640
):
641
    sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
1✔
642
    sfn_snapshot.add_transformers_list(
1✔
643
        [
644
            JsonpathTransformer(
645
                jsonpath="$..detail.startDate",
646
                replacement="start-date",
647
                replace_reference=False,
648
            ),
649
            JsonpathTransformer(
650
                jsonpath="$..detail.stopDate",
651
                replacement="stop-date",
652
                replace_reference=False,
653
            ),
654
            JsonpathTransformer(
655
                jsonpath="$..detail.name",
656
                replacement="test_event_bridge_events-{short_uid()}",
657
                replace_reference=False,
658
            ),
659
        ]
660
    )
661

662
    snf_role_arn = create_state_machine_iam_role(target_aws_client)
1✔
663
    create_output: CreateStateMachineOutput = create_state_machine(
1✔
664
        target_aws_client,
665
        name=f"test_event_bridge_events-{short_uid()}",
666
        definition=definition,
667
        roleArn=snf_role_arn,
668
    )
669
    state_machine_arn = create_output["stateMachineArn"]
1✔
670

671
    queue_url = sfn_events_to_sqs_queue(state_machine_arn=state_machine_arn)
1✔
672

673
    start_execution = target_aws_client.stepfunctions.start_execution(
1✔
674
        stateMachineArn=state_machine_arn, input=execution_input
675
    )
676
    execution_arn = start_execution["executionArn"]
1✔
677
    await_execution_terminated(
1✔
678
        stepfunctions_client=target_aws_client.stepfunctions, execution_arn=execution_arn
679
    )
680

681
    stepfunctions_events = list()
1✔
682

683
    def _get_events():
1✔
684
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
685
        for message in received.get("Messages", []):
1✔
686
            body = json.loads(message["Body"])
1✔
687
            stepfunctions_events.append(body)
1✔
688
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
689
        return stepfunctions_events and stepfunctions_events[-1]["detail"]["status"] != "RUNNING"
1✔
690

691
    poll_condition(_get_events, timeout=60)
1✔
692

693
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
694

695

696
def record_sqs_events(target_aws_client, queue_url, sfn_snapshot, num_events):
1✔
697
    stepfunctions_events = list()
1✔
698

699
    def _get_events():
1✔
700
        received = target_aws_client.sqs.receive_message(QueueUrl=queue_url)
1✔
701
        for message in received.get("Messages", []):
1✔
702
            body = json.loads(message["Body"])
1✔
703
            stepfunctions_events.append(body)
1✔
704
        stepfunctions_events.sort(key=lambda e: e["time"])
1✔
705
        return len(stepfunctions_events) == num_events
1✔
706

707
    poll_condition(_get_events, timeout=60)
1✔
708
    stepfunctions_events.sort(key=lambda e: json.dumps(e.get("detail", dict())))
1✔
709
    sfn_snapshot.match("stepfunctions_events", stepfunctions_events)
1✔
710

711

712
class SfnNoneRecursiveParallelTransformer:
1✔
713
    """
714
    Normalises a sublist of events triggered in by a Parallel state to be order-independent.
715
    """
716

717
    def __init__(self, events_jsonpath: str = "$..events"):
1✔
718
        self.events_jsonpath: str = events_jsonpath
1✔
719

720
    @staticmethod
1✔
721
    def _normalise_events(events: list[dict]) -> None:
1✔
722
        start_idx = None
1✔
723
        sublist = list()
1✔
724
        in_sublist = False
1✔
725
        for i, event in enumerate(events):
1✔
726
            event_type = event.get("type")
1✔
727
            if event_type is None:
1✔
728
                LOG.debug(
×
729
                    "No 'type' in event item '%s'.",
730
                    event,
731
                )
732
                in_sublist = False
×
733

734
            elif event_type in {
1✔
735
                None,
736
                HistoryEventType.ParallelStateSucceeded,
737
                HistoryEventType.ParallelStateAborted,
738
                HistoryEventType.ParallelStateExited,
739
                HistoryEventType.ParallelStateFailed,
740
            }:
741
                events[start_idx:i] = sorted(sublist, key=lambda e: to_json_str(e))
1✔
742
                in_sublist = False
1✔
743
            elif event_type == HistoryEventType.ParallelStateStarted:
1✔
744
                in_sublist = True
1✔
745
                sublist = []
1✔
746
                start_idx = i + 1
1✔
747
            elif in_sublist:
1✔
748
                event["id"] = (0,)
1✔
749
                event["previousEventId"] = 0
1✔
750
                sublist.append(event)
1✔
751

752
    def transform(self, input_data: dict, *, ctx: TransformContext) -> dict:
1✔
753
        pattern = parse("$..events")
1✔
754
        events = pattern.find(input_data)
1✔
755
        if not events:
1✔
756
            LOG.debug("No Stepfunctions 'events' for jsonpath '%s'.", self.events_jsonpath)
×
757
            return input_data
×
758

759
        for events_data in events:
1✔
760
            self._normalise_events(events_data.value)
1✔
761

762
        return input_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc