• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.37
/localstack-core/localstack/utils/aws/dead_letter_queue.py
1
import json
1✔
2
import logging
1✔
3
import uuid
1✔
4

5
from localstack.aws.connect import connect_to
1✔
6
from localstack.utils.aws import arns
1✔
7
from localstack.utils.strings import convert_to_printable_chars, first_char_to_upper
1✔
8

9
LOG = logging.getLogger(__name__)
1✔
10

11

12
def sns_error_to_dead_letter_queue(
1✔
13
    sns_subscriber: dict,
14
    message: str,
15
    error: str,
16
    **kwargs,
17
):
18
    policy = json.loads(sns_subscriber.get("RedrivePolicy") or "{}")
1✔
19
    target_arn = policy.get("deadLetterTargetArn")
1✔
20
    if not target_arn:
1✔
21
        return
1✔
22
    if not_supported := (
1✔
23
        set(kwargs) - {"MessageAttributes", "MessageGroupId", "MessageDeduplicationId"}
24
    ):
UNCOV
25
        LOG.warning(
×
26
            "Not publishing to the DLQ - invalid arguments passed to the DLQ '%s'", not_supported
27
        )
UNCOV
28
        return
×
29
    event = {
1✔
30
        "message": message,
31
        **kwargs,
32
    }
33
    return _send_to_dead_letter_queue(sns_subscriber["SubscriptionArn"], target_arn, event, error)
1✔
34

35

36
def _send_to_dead_letter_queue(source_arn: str, dlq_arn: str, event: dict, error, role: str = None):
1✔
37
    if not dlq_arn:
1✔
38
        return
1✔
39
    LOG.info("Sending failed execution %s to dead letter queue %s", source_arn, dlq_arn)
1✔
40
    messages = _prepare_messages_to_dlq(source_arn, event, error)
1✔
41
    source_service = arns.extract_service_from_arn(source_arn)
1✔
42
    region = arns.extract_region_from_arn(dlq_arn)
1✔
43
    if role:
1✔
44
        clients = connect_to.with_assumed_role(
1✔
45
            role_arn=role, service_principal=source_service, region_name=region
46
        )
47
    else:
48
        clients = connect_to(region_name=region)
1✔
49
    if ":sqs:" in dlq_arn:
1✔
50
        queue_url = arns.sqs_queue_url_for_arn(dlq_arn)
1✔
51
        sqs_client = clients.sqs.request_metadata(
1✔
52
            source_arn=source_arn, service_principal=source_service
53
        )
54
        error = None
1✔
55
        result_code = None
1✔
56
        try:
1✔
57
            result = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=messages)
1✔
58
            result_code = result.get("ResponseMetadata", {}).get("HTTPStatusCode")
1✔
UNCOV
59
        except Exception as e:
×
60
            error = e
×
61
        if error or not result_code or result_code >= 400:
1✔
UNCOV
62
            msg = "Unable to send message to dead letter queue %s (code %s): %s" % (
×
63
                queue_url,
64
                result_code,
65
                error,
66
            )
UNCOV
67
            if "InvalidMessageContents" in str(error):
×
68
                msg += f" - messages: {messages}"
×
69
            LOG.info(msg)
×
70
            raise Exception(msg)
×
71
    elif ":sns:" in dlq_arn:
1✔
72
        sns_client = clients.sns.request_metadata(
1✔
73
            source_arn=source_arn, service_principal=source_service
74
        )
75
        for message in messages:
1✔
76
            sns_client.publish(
1✔
77
                TopicArn=dlq_arn,
78
                Message=message["MessageBody"],
79
                MessageAttributes=message["MessageAttributes"],
80
            )
81
    else:
UNCOV
82
        LOG.warning("Unsupported dead letter queue type: %s", dlq_arn)
×
83
    return dlq_arn
1✔
84

85

86
def _prepare_messages_to_dlq(source_arn: str, event: dict, error) -> list[dict]:
1✔
87
    messages = []
1✔
88
    custom_attrs = {
1✔
89
        "RequestID": {"DataType": "String", "StringValue": str(uuid.uuid4())},
90
        "ErrorCode": {"DataType": "String", "StringValue": "200"},
91
        "ErrorMessage": {"DataType": "String", "StringValue": str(error)},
92
    }
93
    if ":sqs:" in source_arn:
1✔
UNCOV
94
        custom_attrs["ErrorMessage"]["StringValue"] = str(error.result)
×
95
        for record in event.get("Records", []):
×
96
            msg_attrs = message_attributes_to_upper(record.get("messageAttributes"))
×
97
            message_attrs = {**msg_attrs, **custom_attrs}
×
98
            messages.append(
×
99
                {
100
                    "Id": record.get("messageId"),
101
                    "MessageBody": record.get("body"),
102
                    "MessageAttributes": message_attrs,
103
                }
104
            )
105
    elif ":sns:" in source_arn:
1✔
106
        # event can also contain: MessageAttributes, MessageGroupId, MessageDeduplicationId
107
        message = {
1✔
108
            "Id": str(uuid.uuid4()),
109
            "MessageBody": event.pop("message"),
110
            **event,
111
        }
112
        messages.append(message)
1✔
113

114
    elif ":lambda:" in source_arn:
1✔
115
        custom_attrs["ErrorCode"]["DataType"] = "Number"
1✔
116
        # not sure about what type of error can come here
117
        try:
1✔
118
            error_message = json.loads(error.result)["errorMessage"]
1✔
119
            custom_attrs["ErrorMessage"]["StringValue"] = error_message
1✔
UNCOV
120
        except (ValueError, KeyError):
×
121
            # using old behaviour
UNCOV
122
            custom_attrs["ErrorMessage"]["StringValue"] = str(error)
×
123

124
        messages.append(
1✔
125
            {
126
                "Id": str(uuid.uuid4()),
127
                "MessageBody": json.dumps(event),
128
                "MessageAttributes": custom_attrs,
129
            }
130
        )
131
    # make sure we only have printable strings in the message attributes
132
    for message in messages:
1✔
133
        if message.get("MessageAttributes"):
1✔
134
            message["MessageAttributes"] = convert_to_printable_chars(message["MessageAttributes"])
1✔
135
    return messages
1✔
136

137

138
def message_attributes_to_upper(message_attrs: dict) -> dict:
1✔
139
    """Convert message attribute details (first characters) to upper case (e.g., StringValue, DataType)."""
UNCOV
140
    message_attrs = message_attrs or {}
×
141
    for _, attr in message_attrs.items():
×
142
        if not isinstance(attr, dict):
×
143
            continue
×
144
        for key, value in dict(attr).items():
×
145
            attr[first_char_to_upper(key)] = attr.pop(key)
×
146
    return message_attrs
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc