• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18149474864

30 Sep 2025 06:41PM UTC coverage: 86.878% (-0.008%) from 86.886%
18149474864

push

github

web-flow
Cfn: Fix backslash processing for dynamic replacement values (#13212)

1 of 1 new or added line in 1 file covered. (100.0%)

222 existing lines in 9 files now uncovered.

67811 of 78053 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.71
/localstack-core/localstack/services/sqs/utils.py
1
import base64
1✔
2
import hashlib
1✔
3
import itertools
1✔
4
import json
1✔
5
import re
1✔
6
import struct
1✔
7
import time
1✔
8
from typing import Any, Literal, NamedTuple
1✔
9
from urllib.parse import urlparse
1✔
10

11
from localstack.aws.api.sqs import (
1✔
12
    AttributeNameList,
13
    Message,
14
    MessageAttributeNameList,
15
    QueueAttributeName,
16
    ReceiptHandleIsInvalid,
17
)
18
from localstack.services.sqs.constants import (
1✔
19
    DOMAIN_STRATEGY_URL_REGEX,
20
    LEGACY_STRATEGY_URL_REGEX,
21
    PATH_STRATEGY_URL_REGEX,
22
    STANDARD_STRATEGY_URL_REGEX,
23
)
24
from localstack.utils.aws.arns import parse_arn
1✔
25
from localstack.utils.objects import singleton_factory
1✔
26
from localstack.utils.strings import base64_decode, long_uid, to_bytes, to_str
1✔
27

28
STANDARD_ENDPOINT = re.compile(STANDARD_STRATEGY_URL_REGEX)
1✔
29
DOMAIN_ENDPOINT = re.compile(DOMAIN_STRATEGY_URL_REGEX)
1✔
30
PATH_ENDPOINT = re.compile(PATH_STRATEGY_URL_REGEX)
1✔
31
LEGACY_ENDPOINT = re.compile(LEGACY_STRATEGY_URL_REGEX)
1✔
32

33
STRING_TYPE_FIELD_INDEX = 1
1✔
34
BINARY_TYPE_FIELD_INDEX = 2
1✔
35
STRING_LIST_TYPE_FIELD_INDEX = 3
1✔
36
BINARY_LIST_TYPE_FIELD_INDEX = 4
1✔
37

38

39
def is_sqs_queue_url(url: str) -> bool:
1✔
40
    return any(
1✔
41
        [
42
            STANDARD_ENDPOINT.search(url),
43
            DOMAIN_ENDPOINT.search(url),
44
            PATH_ENDPOINT.search(url),
45
            LEGACY_ENDPOINT.search(url),
46
        ]
47
    )
48

49

50
def guess_endpoint_strategy_and_host(
1✔
51
    host: str,
52
) -> tuple[Literal["standard", "domain", "path"], str]:
53
    """
54
    This method is used for the dynamic endpoint strategy. It heuristically determines a tuple where the first
55
    element is the endpoint strategy, and the second is the part of the host after the endpoint prefix and region.
56
    For instance:
57

58
      * ``sqs.us-east-1.localhost.localstack.cloud`` -> ``standard, localhost.localstack.cloud``
59
      * ``queue.localhost.localstack.cloud:4566`` -> ``domain, localhost.localstack.cloud:4566``
60
      * ``us-east-2.queue.amazonaws.com`` -> ``domain, amazonaws.com``
61
      * ``localhost:4566`` -> ``path, localhost:443``
62
      * ``amazonaws.com`` -> ``path, amazonaws.com``
63

64
    :param host: the original host in the request
65
    :return: endpoint strategy, host segment
66
    """
67
    components = host.split(".")
1✔
68

69
    if host.startswith("sqs."):
1✔
70
        return "standard", ".".join(components[2:])
1✔
71

72
    if host.startswith("queue."):
1✔
73
        return "domain", ".".join(components[1:])
1✔
74

75
    if len(components) > 2 and components[1] == "queue":
1✔
76
        return "domain", ".".join(components[2:])
1✔
77

78
    return "path", host
1✔
79

80

81
def is_message_deduplication_id_required(queue):
1✔
82
    content_based_deduplication_disabled = (
1✔
83
        "false"
84
        == (queue.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()
85
    )
86
    return is_fifo_queue(queue) and content_based_deduplication_disabled
1✔
87

88

89
def is_fifo_queue(queue):
1✔
90
    return "true" == queue.attributes.get(QueueAttributeName.FifoQueue, "false").lower()
1✔
91

92

93
def parse_queue_url(queue_url: str) -> tuple[str, str | None, str]:
1✔
94
    """
95
    Parses an SQS Queue URL and returns a triple of account_id, region and queue_name.
96

97
    :param queue_url: the queue URL
98
    :return: account_id, region (may be None), queue_name
99
    """
100
    url = urlparse(queue_url.rstrip("/"))
1✔
101
    path_parts = url.path.lstrip("/").split("/")
1✔
102
    domain_parts = url.netloc.split(".")
1✔
103

104
    if len(path_parts) != 2 and len(path_parts) != 4:
1✔
105
        raise ValueError(f"Not a valid queue URL: {queue_url}")
1✔
106

107
    account_id, queue_name = path_parts[-2:]
1✔
108

109
    if len(path_parts) == 4:
1✔
110
        if path_parts[0] != "queue":
1✔
111
            raise ValueError(f"Not a valid queue URL: {queue_url}")
1✔
112
        # SQS_ENDPOINT_STRATEGY == "path"
113
        region = path_parts[1]
1✔
114
    elif url.netloc.startswith("sqs."):
1✔
115
        # SQS_ENDPOINT_STRATEGY == "standard"
116
        region = domain_parts[1]
1✔
117
    elif ".queue." in url.netloc:
1✔
118
        if domain_parts[1] != "queue":
1✔
119
            # .queue. should be on second position after the region
120
            raise ValueError(f"Not a valid queue URL: {queue_url}")
1✔
121
        # SQS_ENDPOINT_STRATEGY == "domain"
122
        region = domain_parts[0]
1✔
123
    elif url.netloc.startswith("queue"):
1✔
124
        # SQS_ENDPOINT_STRATEGY == "domain" (with default region)
125
        region = "us-east-1"
1✔
126
    else:
127
        region = None
1✔
128

129
    return account_id, region, queue_name
1✔
130

131

132
class ReceiptHandleInformation(NamedTuple):
1✔
133
    identifier: str
1✔
134
    queue_arn: str
1✔
135
    message_id: str
1✔
136
    last_received: str
1✔
137

138

139
def extract_receipt_handle_info(receipt_handle: str) -> ReceiptHandleInformation:
1✔
140
    try:
1✔
141
        handle = base64.b64decode(receipt_handle).decode("utf-8")
1✔
142
        parts = handle.split(" ")
1✔
143
        if len(parts) != 4:
1✔
UNCOV
144
            raise ValueError(f'The input receipt handle "{receipt_handle}" is incomplete.')
×
145
        parse_arn(parts[1])
1✔
146
        return ReceiptHandleInformation(*parts)
1✔
147
    except (IndexError, ValueError) as e:
1✔
148
        raise ReceiptHandleIsInvalid(
1✔
149
            f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'
150
        ) from e
151

152

153
def encode_receipt_handle(queue_arn, message) -> str:
1✔
154
    # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles
155
    # encode the queue arn in the receipt handle, so we can later check if it belongs to the queue
156
    # but also add some randomness s.t. the generated receipt handles look like the ones from AWS
157
    handle = f"{long_uid()} {queue_arn} {message.message.get('MessageId')} {message.last_received}"
1✔
158
    encoded = base64.b64encode(handle.encode("utf-8"))
1✔
159
    return encoded.decode("utf-8")
1✔
160

161

162
def encode_move_task_handle(task_id: str, source_arn: str) -> str:
1✔
163
    """
164
    Move task handles are base64 encoded JSON dictionaries containing the task id and the source arn.
165

166
    :param task_id: the move task id
167
    :param source_arn: the source queue arn
168
    :return: a string of a base64 encoded json doc
169
    """
170
    doc = f'{{"taskId":"{task_id}","sourceArn":"{source_arn}"}}'
1✔
171
    return to_str(base64.b64encode(to_bytes(doc)))
1✔
172

173

174
def decode_move_task_handle(handle: str | bytes) -> tuple[str, str]:
1✔
175
    """
176
    Inverse operation of ``encode_move_task_handle``.
177

178
    :param handle: the base64 encoded task handle
179
    :return: a tuple of task_id and source_arn
180
    :raises ValueError: if the handle is not encoded correctly or does not contain the necessary fields
181
    """
182
    doc = json.loads(base64_decode(handle))
1✔
183
    if "taskId" not in doc:
1✔
UNCOV
184
        raise ValueError("taskId not found in handle")
×
185
    if "sourceArn" not in doc:
1✔
UNCOV
186
        raise ValueError("sourceArn not found in handle")
×
187
    return doc["taskId"], doc["sourceArn"]
1✔
188

189

190
@singleton_factory
1✔
191
def global_message_sequence():
1✔
192
    # creates a 20-digit number used as the start for the global sequence
193
    start = int(time.time()) << 33
1✔
194
    # itertools.count is thread safe over the GIL since its getAndIncrement operation is a single python bytecode op
195
    return itertools.count(start)
1✔
196

197

198
def generate_message_id():
1✔
199
    return long_uid()
1✔
200

201

202
def message_filter_attributes(message: Message, names: AttributeNameList | None):
1✔
203
    """
204
    Utility function filter from the given message (in-place) the system attributes from the given list. It will
205
    apply all rules according to:
206
    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message.
207

208
    :param message: The message to filter (it will be modified)
209
    :param names: the attributes names/filters
210
    """
211
    if "Attributes" not in message:
1✔
UNCOV
212
        return
×
213

214
    if not names:
1✔
215
        del message["Attributes"]
1✔
216
        return
1✔
217

218
    if QueueAttributeName.All in names:
1✔
219
        return
1✔
220

221
    for k in list(message["Attributes"].keys()):
1✔
222
        if k not in names:
1✔
223
            del message["Attributes"][k]
1✔
224

225

226
def message_filter_message_attributes(message: Message, names: MessageAttributeNameList | None):
1✔
227
    """
228
    Utility function filter from the given message (in-place) the message attributes from the given list. It will
229
    apply all rules according to:
230
    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message.
231

232
    :param message: The message to filter (it will be modified)
233
    :param names: the attributes names/filters (can be 'All', '.*', '*' or prefix filters like 'Foo.*')
234
    """
235
    if not message.get("MessageAttributes"):
1✔
236
        return
1✔
237

238
    if not names:
1✔
239
        del message["MessageAttributes"]
1✔
240
        return
1✔
241

242
    if "All" in names or ".*" in names or "*" in names:
1✔
243
        return
1✔
244

245
    attributes = message["MessageAttributes"]
1✔
246
    matched = []
1✔
247

248
    keys = [name for name in names if ".*" not in name]
1✔
249
    prefixes = [name.split(".*")[0] for name in names if ".*" in name]
1✔
250

251
    # match prefix filters
252
    for k in attributes:
1✔
253
        if k in keys:
1✔
254
            matched.append(k)
1✔
255
            continue
1✔
256

257
        for prefix in prefixes:
1✔
258
            if k.startswith(prefix):
1✔
259
                matched.append(k)
1✔
260
            break
1✔
261
    if matched:
1✔
262
        message["MessageAttributes"] = {k: attributes[k] for k in matched}
1✔
263
    else:
264
        message.pop("MessageAttributes")
1✔
265

266

267
def _utf8(value: Any) -> bytes:  # type: ignore[misc]
1✔
268
    if isinstance(value, str):
1✔
269
        return value.encode("utf-8")
1✔
UNCOV
270
    return value
×
271

272

273
def _update_binary_length_and_value(md5: Any, value: bytes) -> None:  # type: ignore[misc]
1✔
274
    length_bytes = struct.pack("!I".encode("ascii"), len(value))
1✔
275
    md5.update(length_bytes)
1✔
276
    md5.update(value)
1✔
277

278

279
def create_message_attribute_hash(message_attributes) -> str | None:
1✔
280
    """
281
    Method from moto's attribute_md5 of moto/sqs/models.py, separated from the Message Object.
282
    """
283
    # To avoid the need to check for dict conformity everytime we invoke this function
284
    if not isinstance(message_attributes, dict):
1✔
285
        return
1✔
286

287
    hash = hashlib.md5()
1✔
288

289
    for attrName in sorted(message_attributes.keys()):
1✔
290
        attr_value = message_attributes[attrName]
1✔
291
        # Encode name
292
        _update_binary_length_and_value(hash, _utf8(attrName))
1✔
293
        # Encode data type
294
        _update_binary_length_and_value(hash, _utf8(attr_value["DataType"]))
1✔
295
        # Encode transport type and value
296
        if attr_value.get("StringValue"):
1✔
297
            hash.update(bytearray([STRING_TYPE_FIELD_INDEX]))
1✔
298
            _update_binary_length_and_value(hash, _utf8(attr_value.get("StringValue")))
1✔
299
        elif attr_value.get("BinaryValue"):
1✔
300
            hash.update(bytearray([BINARY_TYPE_FIELD_INDEX]))
1✔
301
            decoded_binary_value = attr_value.get("BinaryValue")
1✔
302
            _update_binary_length_and_value(hash, decoded_binary_value)
1✔
303
        # string_list_value, binary_list_value type is not implemented, reserved for the future use.
304
        # See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_MessageAttributeValue.html
305
    return hash.hexdigest()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc