• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

17 Mar 2025 11:44PM UTC coverage: 86.954% (+0.02%) from 86.93%
7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

push

circleci

web-flow
SNS: fix Filter Policy engine to not evaluate full complex payload (#12395)

8 of 8 new or added lines in 1 file covered. (100.0%)

27 existing lines in 8 files now uncovered.

62326 of 71677 relevant lines covered (86.95%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.39
/localstack-core/localstack/services/sns/filter.py
1
import ipaddress
1✔
2
import json
1✔
3
import typing as t
1✔
4

5
from localstack.aws.api.sns import InvalidParameterException
1✔
6

7

8
class SubscriptionFilter:
1✔
9
    def check_filter_policy_on_message_attributes(
1✔
10
        self, filter_policy: dict, message_attributes: dict
11
    ):
12
        if not filter_policy:
1✔
13
            return True
1✔
14

15
        flat_policy_conditions = self.flatten_policy(filter_policy)
1✔
16

17
        return any(
1✔
18
            all(
19
                self._evaluate_filter_policy_conditions_on_attribute(
20
                    conditions,
21
                    message_attributes.get(criteria),
22
                    field_exists=criteria in message_attributes,
23
                )
24
                for criteria, conditions in flat_policy.items()
25
            )
26
            for flat_policy in flat_policy_conditions
27
        )
28

29
    def check_filter_policy_on_message_body(self, filter_policy: dict, message_body: str):
1✔
30
        try:
1✔
31
            body = json.loads(message_body)
1✔
32
            if not isinstance(body, dict):
1✔
33
                return False
1✔
34
        except json.JSONDecodeError:
1✔
35
            # Filter policies for the message body assume that the message payload is a well-formed JSON object.
36
            # See https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html
37
            return False
1✔
38

39
        return self._evaluate_nested_filter_policy_on_dict(filter_policy, payload=body)
1✔
40

41
    def _evaluate_nested_filter_policy_on_dict(self, filter_policy, payload: dict) -> bool:
1✔
42
        """
43
        This method evaluates the filter policy against the JSON decoded payload.
44
        Although it's not documented anywhere, AWS allows `.` in the fields name in the filter policy and the payload,
45
        and will evaluate them. However, it's not JSONPath compatible:
46
        Example:
47
        Policy: `{"field1.field2": "value1"}`
48
        This policy will match both `{"field1.field2": "value1"}` and  {"field1: {"field2": "value1"}}`, unlike JSONPath
49
        for which `.` points to a child node.
50
        This might show they are flattening the both dictionaries to a single level for an easier matching without
51
        recursion.
52
        :param filter_policy: a dict, starting at the FilterPolicy
53
        :param payload: a dict, starting at the MessageBody
54
        :return: True if the payload respect the filter policy, otherwise False
55
        """
56
        if not filter_policy:
1✔
57
            return True
×
58

59
        # TODO: maybe save/cache the flattened/expanded policy?
60
        flat_policy_conditions = self.flatten_policy(filter_policy)
1✔
61
        flat_payloads = self.flatten_payload(payload, flat_policy_conditions)
1✔
62

63
        return any(
1✔
64
            all(
65
                any(
66
                    self._evaluate_condition(
67
                        flat_payload.get(key), condition, field_exists=key in flat_payload
68
                    )
69
                    for condition in conditions
70
                    for flat_payload in flat_payloads
71
                )
72
                for key, conditions in flat_policy.items()
73
            )
74
            for flat_policy in flat_policy_conditions
75
        )
76

77
    def _evaluate_filter_policy_conditions_on_attribute(
1✔
78
        self, conditions, attribute, field_exists: bool
79
    ):
80
        if not isinstance(conditions, list):
1✔
81
            conditions = [conditions]
1✔
82

83
        tpe = attribute.get("DataType") or attribute.get("Type") if attribute else None
1✔
84
        val = attribute.get("StringValue") or attribute.get("Value") if attribute else None
1✔
85
        if attribute is not None and tpe == "String.Array":
1✔
86
            try:
1✔
87
                values = json.loads(val)
1✔
88
            except ValueError:
1✔
89
                return False
1✔
90
            for value in values:
1✔
91
                for condition in conditions:
1✔
92
                    if self._evaluate_condition(value, condition, field_exists):
1✔
93
                        return True
1✔
94
        else:
95
            for condition in conditions:
1✔
96
                value = val or None
1✔
97
                if self._evaluate_condition(value, condition, field_exists):
1✔
98
                    return True
1✔
99

100
        return False
1✔
101

102
    def _evaluate_condition(self, value, condition, field_exists: bool):
1✔
103
        if not isinstance(condition, dict):
1✔
104
            return field_exists and value == condition
1✔
105
        elif (must_exist := condition.get("exists")) is not None:
1✔
106
            # if must_exists is True then field_exists must be True
107
            # if must_exists is False then fields_exists must be False
108
            return must_exist == field_exists
1✔
109
        elif value is None:
1✔
110
            # the remaining conditions require the value to not be None
111
            return False
1✔
112
        elif anything_but := condition.get("anything-but"):
1✔
113
            if isinstance(anything_but, dict):
1✔
114
                not_prefix = anything_but.get("prefix")
1✔
115
                return not value.startswith(not_prefix)
1✔
116
            elif isinstance(anything_but, list):
1✔
117
                return value not in anything_but
1✔
118
            else:
119
                return value != anything_but
1✔
120
        elif prefix := condition.get("prefix"):
1✔
121
            return value.startswith(prefix)
1✔
122
        elif suffix := condition.get("suffix"):
1✔
123
            return value.endswith(suffix)
1✔
124
        elif equal_ignore_case := condition.get("equals-ignore-case"):
1✔
125
            return equal_ignore_case.lower() == value.lower()
1✔
126
        elif numeric_condition := condition.get("numeric"):
1✔
127
            return self._evaluate_numeric_condition(numeric_condition, value)
1✔
128
        elif cidr := condition.get("cidr"):
1✔
129
            try:
1✔
130
                ip = ipaddress.ip_address(value)
1✔
131
                return ip in ipaddress.ip_network(cidr)
1✔
132
            except ValueError:
1✔
133
                return False
1✔
134

135
        return False
×
136

137
    @staticmethod
1✔
138
    def _evaluate_numeric_condition(conditions, value):
1✔
139
        try:
1✔
140
            # try if the value is numeric
141
            value = float(value)
1✔
142
        except ValueError:
1✔
143
            # the value is not numeric, the condition is False
144
            return False
1✔
145

146
        for i in range(0, len(conditions), 2):
1✔
147
            operator = conditions[i]
1✔
148
            operand = float(conditions[i + 1])
1✔
149

150
            if operator == "=":
1✔
151
                if value != operand:
1✔
152
                    return False
1✔
153
            elif operator == ">":
1✔
154
                if value <= operand:
1✔
155
                    return False
1✔
156
            elif operator == "<":
1✔
157
                if value >= operand:
1✔
158
                    return False
1✔
159
            elif operator == ">=":
1✔
160
                if value < operand:
1✔
161
                    return False
1✔
162
            elif operator == "<=":
1✔
163
                if value > operand:
1✔
164
                    return False
1✔
165

166
        return True
1✔
167

168
    @staticmethod
1✔
169
    def flatten_policy(nested_dict: dict) -> list[dict]:
1✔
170
        """
171
        Takes a dictionary as input and will output the dictionary on a single level.
172
        Input:
173
        `{"field1": {"field2": {"field3": "val1", "field4": "val2"}}}`
174
        Output:
175
        `[
176
            {
177
                "field1.field2.field3": "val1",
178
                "field1.field2.field4": "val2"
179
            }
180
        ]`
181
        Input with $or will create multiple outputs:
182
        `{"$or": [{"field1": "val1"}, {"field2": "val2"}], "field3": "val3"}`
183
        Output:
184
        `[
185
            {"field1": "val1", "field3": "val3"},
186
            {"field2": "val2", "field3": "val3"}
187
        ]`
188
        :param nested_dict: a (nested) dictionary
189
        :return: a list of flattened dictionaries with no nested dict or list inside, flattened to a
190
        single level, one list item for every list item encountered
191
        """
192

193
        def _traverse_policy(obj, array=None, parent_key=None) -> list:
1✔
194
            if array is None:
1✔
195
                array = [{}]
1✔
196

197
            for key, values in obj.items():
1✔
198
                if key == "$or" and isinstance(values, list) and len(values) > 1:
1✔
199
                    # $or will create multiple new branches in the array.
200
                    # Each current branch will traverse with each choice in $or
201
                    array = [
1✔
202
                        i for value in values for i in _traverse_policy(value, array, parent_key)
203
                    ]
204
                else:
205
                    # We update the parent key do that {"key1": {"key2": ""}} becomes "key1.key2"
206
                    _parent_key = f"{parent_key}.{key}" if parent_key else key
1✔
207
                    if isinstance(values, dict):
1✔
208
                        # If the current key has child dict -- key: "key1", child: {"key2": ["val1", val2"]}
209
                        # We only update the parent_key and traverse its children with the current branches
210
                        array = _traverse_policy(values, array, _parent_key)
1✔
211
                    else:
212
                        # If the current key has no child, this means we found the values to match -- child: ["val1", val2"]
213
                        # we update the branches with the parent chain and the values -- {"key1.key2": ["val1, val2"]}
214
                        array = [{**item, _parent_key: values} for item in array]
1✔
215

216
            return array
1✔
217

218
        return _traverse_policy(nested_dict)
1✔
219

220
    @staticmethod
1✔
221
    def flatten_payload(payload: dict, policy_conditions: list[dict]) -> list[dict]:
1✔
222
        """
223
        Takes a dictionary as input and will output the dictionary on a single level.
224
        The dictionary can have lists containing other dictionaries, and one root level entry will be created for every
225
        item in a list if it corresponds to the entries of the policy conditions.
226
        Input:
227
        payload:
228
        `{"field1": {
229
            "field2: [
230
                {"field3": "val1", "field4": "val2"},
231
                {"field3": "val3", "field4": "val4"}
232
            }
233
        ]}`
234
        policy_conditions:
235
        `[
236
            "field1.field2.field3": <condition>,
237
            "field1.field2.field4": <condition>,
238
        ]`
239
        Output:
240
        `[
241
            {
242
                "field1.field2.field3": "val1",
243
                "field1.field2.field4": "val2"
244
            },
245
            {
246
                "field1.field2.field3": "val3",
247
                "field1.field2.field4": "val4"
248
            }
249
        ]`
250
        :param payload: a (nested) dictionary
251
        :return: flatten_dict: a dictionary with no nested dict inside, flattened to a single level
252
        """
253
        policy_keys = {key for keys in policy_conditions for key in keys}
1✔
254

255
        def _is_key_in_policy(key: str) -> bool:
1✔
256
            return key is None or any(policy_key.startswith(key) for policy_key in policy_keys)
1✔
257

258
        def _traverse(_object: dict, array=None, parent_key=None) -> list:
1✔
259
            if isinstance(_object, dict):
1✔
260
                for key, values in _object.items():
1✔
261
                    # We update the parent key so that {"key1": {"key2": ""}} becomes "key1.key2"
262
                    _parent_key = f"{parent_key}.{key}" if parent_key else key
1✔
263

264
                    # we make sure that we are building only the relevant parts of the payload related to the policy
265
                    # the payload could be very complex, and the policy only applies to part of it
266
                    if _is_key_in_policy(_parent_key):
1✔
267
                        array = _traverse(values, array, _parent_key)
1✔
268

269
            elif isinstance(_object, list):
1✔
270
                if not _object:
1✔
UNCOV
271
                    return array
×
272
                array = [i for value in _object for i in _traverse(value, array, parent_key)]
1✔
273
            else:
274
                array = [{**item, parent_key: _object} for item in array]
1✔
275

276
            return array
1✔
277

278
        return _traverse(payload, array=[{}], parent_key=None)
1✔
279

280

281
class FilterPolicyValidator:
1✔
282
    def __init__(self, scope: str, is_subscribe_call: bool):
1✔
283
        self.scope = scope
1✔
284
        self.error_prefix = (
1✔
285
            "Invalid parameter: Attributes Reason: " if is_subscribe_call else "Invalid parameter: "
286
        )
287

288
    def validate_filter_policy(self, filter_policy: dict[str, t.Any]):
1✔
289
        # # A filter policy can have a maximum of five attribute names. For a nested policy, only parent keys are counted.
290
        if len(filter_policy.values()) > 5:
1✔
291
            raise InvalidParameterException(
1✔
292
                f"{self.error_prefix}FilterPolicy: Filter policy can not have more than 5 keys"
293
            )
294

295
        aggregated_rules, combinations = self.aggregate_rules(filter_policy)
1✔
296
        # For the complexity of the filter policy, the total combination of values must not exceed 150.
297
        # https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html
298
        if combinations > 150:
1✔
299
            raise InvalidParameterException(
1✔
300
                f"{self.error_prefix}FilterPolicy: Filter policy is too complex"
301
            )
302

303
        for rules in aggregated_rules:
1✔
304
            for rule in rules:
1✔
305
                self._validate_rule(rule)
1✔
306

307
    def aggregate_rules(self, filter_policy: dict[str, t.Any]) -> tuple[list[list[t.Any]], int]:
1✔
308
        """
309
        This method evaluate the filter policy recursively, and returns only a list of lists of rules.
310
        It also calculates the combinations of rules, calculated depending on the nesting of the rules.
311
        Example:
312
        nested_filter_policy = {
313
            "key_a": {
314
                "key_b": {
315
                    "key_c": ["value_one", "value_two", "value_three", "value_four"]
316
                }
317
            },
318
            "key_d": {
319
                "key_e": ["value_one", "value_two", "value_three"]
320
            }
321
        }
322
        This function then iterates on the values of the top level keys of the filter policy: ("key_a", "key_d")
323
        If the iterated value is not a list, it means it is a nested property. If the scope is `MessageBody`, it is
324
        allowed, we call this method on the value, adding a level to the depth to keep track on how deep the key is.
325
        If the value is a list, it means it contains rules: we will append this list of rules in _rules, and
326
        calculate the combinations it adds.
327
        For the example filter policy containing nested properties, we calculate it this way
328
        The first array has four values in a three-level nested key, and the second has three values in a two-level
329
        nested key. 3 x 4 x 2 x 3 = 72
330
        The return value would be:
331
        [["value_one", "value_two", "value_three", "value_four"], ["value_one", "value_two", "value_three"]]
332
        It allows us to later iterate of the list of rules in an easy way, to verify its conditions only.
333

334
        :param filter_policy: a dict, starting at the FilterPolicy
335
        :return: a tuple with a list of lists of rules and the calculated number of combinations
336
        """
337

338
        def _inner(
1✔
339
            policy_elements: dict[str, t.Any], depth: int = 1, combinations: int = 1
340
        ) -> tuple[list[list[t.Any]], int]:
341
            _rules = []
1✔
342
            for key, _value in policy_elements.items():
1✔
343
                if isinstance(_value, dict):
1✔
344
                    # From AWS docs: "unlike attribute-based policies, payload-based policies support property nesting."
345
                    sub_rules, combinations = _inner(
1✔
346
                        _value, depth=depth + 1, combinations=combinations
347
                    )
348
                    _rules.extend(sub_rules)
1✔
349
                elif isinstance(_value, list):
1✔
350
                    if not _value:
1✔
351
                        raise InvalidParameterException(
1✔
352
                            f"{self.error_prefix}FilterPolicy: Empty arrays are not allowed"
353
                        )
354

355
                    current_combination = 0
1✔
356
                    if key == "$or":
1✔
357
                        for val in _value:
1✔
358
                            sub_rules, or_combinations = _inner(
1✔
359
                                val, depth=depth, combinations=combinations
360
                            )
361
                            _rules.extend(sub_rules)
1✔
362
                            current_combination += or_combinations
1✔
363

364
                        combinations = current_combination
1✔
365
                    else:
366
                        _rules.append(_value)
1✔
367
                        combinations = combinations * len(_value) * depth
1✔
368
                else:
369
                    raise InvalidParameterException(
1✔
370
                        f'{self.error_prefix}FilterPolicy: "{key}" must be an object or an array'
371
                    )
372

373
            if self.scope == "MessageAttributes" and depth > 1:
1✔
374
                raise InvalidParameterException(
1✔
375
                    f"{self.error_prefix}Filter policy scope MessageAttributes does not support nested filter policy"
376
                )
377

378
            return _rules, combinations
1✔
379

380
        return _inner(filter_policy)
1✔
381

382
    def _validate_rule(self, rule: t.Any) -> None:
1✔
383
        match rule:
1✔
384
            case None | str() | bool():
1✔
385
                return
1✔
386

387
            case int() | float():
1✔
388
                # TODO: AWS says they support only from -10^9 to 10^9 but seems to accept it, so we just return
389
                # if rule <= -1000000000 or rule >= 1000000000:
390
                #     raise ""
391
                return
1✔
392

393
            case {**kwargs}:
1✔
394
                if len(kwargs) != 1:
1✔
395
                    raise InvalidParameterException(
1✔
396
                        f"{self.error_prefix}FilterPolicy: Only one key allowed in match expression"
397
                    )
398

399
                operator, value = None, None
1✔
400
                for k, v in kwargs.items():
1✔
401
                    operator, value = k, v
1✔
402

403
                if operator in (
1✔
404
                    "equals-ignore-case",
405
                    "prefix",
406
                    "suffix",
407
                ):
408
                    if not isinstance(value, str):
1✔
409
                        raise InvalidParameterException(
1✔
410
                            f"{self.error_prefix}FilterPolicy: {operator} match pattern must be a string"
411
                        )
412
                    return
1✔
413

414
                elif operator == "anything-but":
1✔
415
                    # anything-but can actually contain any kind of simple rule (str, number, and list)
416
                    if isinstance(value, list):
1✔
417
                        for v in value:
1✔
418
                            self._validate_rule(v)
1✔
419

420
                        return
1✔
421

422
                    # or have a nested `prefix` pattern
423
                    elif isinstance(value, dict):
1✔
424
                        for inner_operator in value.keys():
1✔
425
                            if inner_operator != "prefix":
1✔
426
                                raise InvalidParameterException(
1✔
427
                                    f"{self.error_prefix}FilterPolicy: Unsupported anything-but pattern: {inner_operator}"
428
                                )
429

430
                    self._validate_rule(value)
1✔
431
                    return
1✔
432

433
                elif operator == "exists":
1✔
434
                    if not isinstance(value, bool):
1✔
435
                        raise InvalidParameterException(
1✔
436
                            f"{self.error_prefix}FilterPolicy: exists match pattern must be either true or false."
437
                        )
438
                    return
1✔
439

440
                elif operator == "numeric":
1✔
441
                    self._validate_numeric_condition(value)
1✔
442

443
                elif operator == "cidr":
1✔
444
                    self._validate_cidr_condition(value)
1✔
445

446
                else:
447
                    raise InvalidParameterException(
1✔
448
                        f"{self.error_prefix}FilterPolicy: Unrecognized match type {operator}"
449
                    )
450

451
            case _:
1✔
452
                raise InvalidParameterException(
1✔
453
                    f"{self.error_prefix}FilterPolicy: Match value must be String, number, true, false, or null"
454
                )
455

456
    def _validate_cidr_condition(self, value):
1✔
457
        if not isinstance(value, str):
1✔
458
            # `cidr` returns the prefix error
459
            raise InvalidParameterException(
1✔
460
                f"{self.error_prefix}FilterPolicy: prefix match pattern must be a string"
461
            )
462
        splitted = value.split("/")
1✔
463
        if len(splitted) != 2:
1✔
464
            raise InvalidParameterException(
1✔
465
                f"{self.error_prefix}FilterPolicy: Malformed CIDR, one '/' required"
466
            )
467
        ip_addr, mask = value.split("/")
1✔
468
        try:
1✔
469
            int(mask)
1✔
470
        except ValueError:
1✔
471
            raise InvalidParameterException(
1✔
472
                f"{self.error_prefix}FilterPolicy: Malformed CIDR, mask bits must be an integer"
473
            )
474
        try:
1✔
475
            ipaddress.ip_network(value)
1✔
476
        except ValueError:
1✔
477
            raise InvalidParameterException(
1✔
478
                f"{self.error_prefix}FilterPolicy: Nonstandard IP address: {ip_addr}"
479
            )
480

481
    def _validate_numeric_condition(self, value):
1✔
482
        if not value:
1✔
483
            raise InvalidParameterException(
1✔
484
                f"{self.error_prefix}FilterPolicy: Invalid member in numeric match: ]"
485
            )
486
        num_values = value[::-1]
1✔
487

488
        operator = num_values.pop()
1✔
489
        if not isinstance(operator, str):
1✔
490
            raise InvalidParameterException(
1✔
491
                f"{self.error_prefix}FilterPolicy: Invalid member in numeric match: {operator}"
492
            )
493
        elif operator not in ("<", "<=", "=", ">", ">="):
1✔
494
            raise InvalidParameterException(
1✔
495
                f"{self.error_prefix}FilterPolicy: Unrecognized numeric range operator: {operator}"
496
            )
497

498
        value = num_values.pop() if num_values else None
1✔
499
        if not isinstance(value, (int, float)):
1✔
500
            exc_operator = "equals" if operator == "=" else operator
1✔
501
            raise InvalidParameterException(
1✔
502
                f"{self.error_prefix}FilterPolicy: Value of {exc_operator} must be numeric"
503
            )
504

505
        if not num_values:
1✔
506
            return
×
507

508
        if operator not in (">", ">="):
1✔
509
            raise InvalidParameterException(
1✔
510
                f"{self.error_prefix}FilterPolicy: Too many elements in numeric expression"
511
            )
512

513
        second_operator = num_values.pop()
1✔
514
        if not isinstance(second_operator, str):
1✔
515
            raise InvalidParameterException(
1✔
516
                f"{self.error_prefix}FilterPolicy: Bad value in numeric range: {second_operator}"
517
            )
518
        elif second_operator not in ("<", "<="):
1✔
519
            raise InvalidParameterException(
1✔
520
                f"{self.error_prefix}FilterPolicy: Bad numeric range operator: {second_operator}"
521
            )
522

523
        second_value = num_values.pop() if num_values else None
1✔
524
        if not isinstance(second_value, (int, float)):
1✔
525
            exc_operator = "equals" if second_operator == "=" else second_operator
1✔
526
            raise InvalidParameterException(
1✔
527
                f"{self.error_prefix}FilterPolicy: Value of {exc_operator} must be numeric"
528
            )
529

530
        elif second_value <= value:
1✔
531
            raise InvalidParameterException(
1✔
532
                f"{self.error_prefix}FilterPolicy: Bottom must be less than top"
533
            )
534

535
        elif num_values:
1✔
536
            raise InvalidParameterException(
1✔
537
                f"{self.error_prefix}FilterPolicy: Too many terms in numeric range expression"
538
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc