• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

17 Mar 2025 11:44PM UTC coverage: 86.954% (+0.02%) from 86.93%
7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

push

circleci

web-flow
SNS: fix Filter Policy engine to not evaluate full complex payload (#12395)

8 of 8 new or added lines in 1 file covered. (100.0%)

27 existing lines in 8 files now uncovered.

62326 of 71677 relevant lines covered (86.95%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.11
/localstack-core/localstack/services/events/event_rule_engine.py
1
import ipaddress
1✔
2
import json
1✔
3
import re
1✔
4
import typing as t
1✔
5

6
from localstack.aws.api.events import InvalidEventPatternException
1✔
7

8

9
class EventRuleEngine:
1✔
10
    def evaluate_pattern_on_event(self, compiled_event_pattern: dict, event: str | dict):
1✔
11
        if isinstance(event, str):
1✔
12
            try:
1✔
13
                body = json.loads(event)
1✔
14
                if not isinstance(body, dict):
1✔
15
                    return False
×
16
            except json.JSONDecodeError:
×
17
                # Event pattern for the message body assume that the message payload is a well-formed JSON object.
18
                return False
×
19
        else:
20
            body = event
1✔
21

22
        return self._evaluate_nested_event_pattern_on_dict(compiled_event_pattern, payload=body)
1✔
23

24
    def _evaluate_nested_event_pattern_on_dict(self, event_pattern, payload: dict) -> bool:
1✔
25
        """
26
        This method evaluates the event pattern against the JSON decoded payload.
27
        Although it's not documented anywhere, AWS allows `.` in the fields name in the event pattern and the payload,
28
        and will evaluate them. However, it's not JSONPath compatible.
29
        See:
30
        https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-pattern.html#eb-create-pattern-considerations
31
        Example:
32
        Pattern: `{"field1.field2": "value1"}`
33
        This pattern will match both `{"field1.field2": "value1"}` and  {"field1: {"field2": "value1"}}`, unlike JSONPath
34
        for which `.` points to a child node.
35
        This might show they are flattening the both dictionaries to a single level for an easier matching without
36
        recursion.
37
        :param event_pattern: a dict, starting at the Event Pattern
38
        :param payload: a dict, starting at the MessageBody
39
        :return: True if the payload respect the event pattern, otherwise False
40
        """
41
        if not event_pattern:
1✔
42
            return True
×
43

44
        # TODO: maybe save/cache the flattened/expanded pattern?
45
        flat_pattern_conditions = self.flatten_pattern(event_pattern)
1✔
46
        flat_payloads = self.flatten_payload(payload, flat_pattern_conditions)
1✔
47

48
        return any(
1✔
49
            all(
50
                any(
51
                    self._evaluate_condition(
52
                        flat_payload.get(key), condition, field_exists=key in flat_payload
53
                    )
54
                    for condition in conditions
55
                    for flat_payload in flat_payloads
56
                )
57
                for key, conditions in flat_pattern.items()
58
            )
59
            for flat_pattern in flat_pattern_conditions
60
        )
61

62
    def _evaluate_condition(self, value, condition, field_exists: bool):
1✔
63
        if not isinstance(condition, dict):
1✔
64
            return field_exists and value == condition
1✔
65
        elif (must_exist := condition.get("exists")) is not None:
1✔
66
            # if must_exists is True then field_exists must be True
67
            # if must_exists is False then fields_exists must be False
68
            return must_exist == field_exists
1✔
69
        elif (anything_but := condition.get("anything-but")) is not None:
1✔
70
            if isinstance(anything_but, dict):
1✔
71
                if (not_condition := anything_but.get("prefix")) is not None:
1✔
72
                    predicate = self._evaluate_prefix
1✔
73
                elif (not_condition := anything_but.get("suffix")) is not None:
1✔
74
                    predicate = self._evaluate_suffix
1✔
75
                elif (not_condition := anything_but.get("equals-ignore-case")) is not None:
1✔
76
                    predicate = self._evaluate_equal_ignore_case
1✔
77
                elif (not_condition := anything_but.get("wildcard")) is not None:
1✔
78
                    predicate = self._evaluate_wildcard
1✔
79
                else:
80
                    # this should not happen as we validate the EventPattern before
81
                    return False
×
82

83
                if isinstance(not_condition, str):
1✔
84
                    return not predicate(not_condition, value)
1✔
85
                elif isinstance(not_condition, list):
1✔
86
                    return all(
1✔
87
                        not predicate(sub_condition, value) for sub_condition in not_condition
88
                    )
89

90
            elif isinstance(anything_but, list):
1✔
91
                return value not in anything_but
1✔
92
            else:
93
                return value != anything_but
1✔
94

95
        elif value is None:
1✔
96
            # the remaining conditions require the value to not be None
97
            return False
1✔
98
        elif (prefix := condition.get("prefix")) is not None:
1✔
99
            if isinstance(prefix, dict):
1✔
100
                if (prefix_equal_ignore_case := prefix.get("equals-ignore-case")) is not None:
1✔
101
                    return self._evaluate_prefix(prefix_equal_ignore_case.lower(), value.lower())
1✔
102
            else:
103
                return self._evaluate_prefix(prefix, value)
1✔
104

105
        elif (suffix := condition.get("suffix")) is not None:
1✔
106
            if isinstance(suffix, dict):
1✔
107
                if suffix_equal_ignore_case := suffix.get("equals-ignore-case"):
1✔
108
                    return self._evaluate_suffix(suffix_equal_ignore_case.lower(), value.lower())
1✔
109
            else:
110
                return self._evaluate_suffix(suffix, value)
1✔
111

112
        elif (equal_ignore_case := condition.get("equals-ignore-case")) is not None:
1✔
113
            return self._evaluate_equal_ignore_case(equal_ignore_case, value)
1✔
114

115
        # we validated that `numeric`  should be a non-empty list when creating the rule, we don't need the None check
116
        elif numeric_condition := condition.get("numeric"):
1✔
117
            return self._evaluate_numeric_condition(numeric_condition, value)
1✔
118

119
        # we also validated the `cidr` that it cannot be empty
120
        elif cidr := condition.get("cidr"):
1✔
121
            return self._evaluate_cidr(cidr, value)
1✔
122

123
        elif (wildcard := condition.get("wildcard")) is not None:
1✔
124
            return self._evaluate_wildcard(wildcard, value)
1✔
125

126
        return False
×
127

128
    @staticmethod
1✔
129
    def _evaluate_prefix(condition: str | list, value: str) -> bool:
1✔
130
        return value.startswith(condition)
1✔
131

132
    @staticmethod
1✔
133
    def _evaluate_suffix(condition: str | list, value: str) -> bool:
1✔
134
        return value.endswith(condition)
1✔
135

136
    @staticmethod
1✔
137
    def _evaluate_equal_ignore_case(condition: str, value: str) -> bool:
1✔
138
        return condition.lower() == value.lower()
1✔
139

140
    @staticmethod
1✔
141
    def _evaluate_cidr(condition: str, value: str) -> bool:
1✔
142
        try:
1✔
143
            ip = ipaddress.ip_address(value)
1✔
144
            return ip in ipaddress.ip_network(condition)
1✔
145
        except ValueError:
1✔
146
            return False
1✔
147

148
    @staticmethod
1✔
149
    def _evaluate_wildcard(condition: str, value: str) -> bool:
1✔
150
        return bool(re.match(re.escape(condition).replace("\\*", ".+") + "$", value))
1✔
151

152
    @staticmethod
1✔
153
    def _evaluate_numeric_condition(conditions: list, value: t.Any) -> bool:
1✔
154
        if not isinstance(value, (int, float)):
1✔
155
            return False
1✔
156
        try:
1✔
157
            # try if the value is numeric
158
            value = float(value)
1✔
159
        except ValueError:
×
160
            # the value is not numeric, the condition is False
161
            return False
×
162

163
        for i in range(0, len(conditions), 2):
1✔
164
            operator = conditions[i]
1✔
165
            operand = float(conditions[i + 1])
1✔
166

167
            if operator == "=":
1✔
168
                if value != operand:
1✔
169
                    return False
1✔
170
            elif operator == ">":
1✔
171
                if value <= operand:
1✔
172
                    return False
1✔
173
            elif operator == "<":
1✔
174
                if value >= operand:
1✔
175
                    return False
1✔
176
            elif operator == ">=":
1✔
177
                if value < operand:
1✔
178
                    return False
×
179
            elif operator == "<=":
1✔
180
                if value > operand:
1✔
181
                    return False
1✔
182

183
        return True
1✔
184

185
    @staticmethod
1✔
186
    def flatten_pattern(nested_dict: dict) -> list[dict]:
1✔
187
        """
188
        Takes a dictionary as input and will output the dictionary on a single level.
189
        Input:
190
        `{"field1": {"field2": {"field3": "val1", "field4": "val2"}}}`
191
        Output:
192
        `[
193
            {
194
                "field1.field2.field3": "val1",
195
                "field1.field2.field4": "val2"
196
            }
197
        ]`
198
        Input with $or will create multiple outputs:
199
        `{"$or": [{"field1": "val1"}, {"field2": "val2"}], "field3": "val3"}`
200
        Output:
201
        `[
202
            {"field1": "val1", "field3": "val3"},
203
            {"field2": "val2", "field3": "val3"}
204
        ]`
205
        :param nested_dict: a (nested) dictionary
206
        :return: a list of flattened dictionaries with no nested dict or list inside, flattened to a
207
        single level, one list item for every list item encountered
208
        """
209

210
        def _traverse_event_pattern(obj, array=None, parent_key=None) -> list:
1✔
211
            if array is None:
1✔
212
                array = [{}]
1✔
213

214
            for key, values in obj.items():
1✔
215
                if key == "$or" and isinstance(values, list) and len(values) > 1:
1✔
216
                    # $or will create multiple new branches in the array.
217
                    # Each current branch will traverse with each choice in $or
218
                    array = [
1✔
219
                        i
220
                        for value in values
221
                        for i in _traverse_event_pattern(value, array, parent_key)
222
                    ]
223
                else:
224
                    # We update the parent key do that {"key1": {"key2": ""}} becomes "key1.key2"
225
                    _parent_key = f"{parent_key}.{key}" if parent_key else key
1✔
226
                    if isinstance(values, dict):
1✔
227
                        # If the current key has child dict -- key: "key1", child: {"key2": ["val1", val2"]}
228
                        # We only update the parent_key and traverse its children with the current branches
229
                        array = _traverse_event_pattern(values, array, _parent_key)
1✔
230
                    else:
231
                        # If the current key has no child, this means we found the values to match -- child: ["val1", val2"]
232
                        # we update the branches with the parent chain and the values -- {"key1.key2": ["val1, val2"]}
233
                        array = [{**item, _parent_key: values} for item in array]
1✔
234

235
            return array
1✔
236

237
        return _traverse_event_pattern(nested_dict)
1✔
238

239
    @staticmethod
1✔
240
    def flatten_payload(payload: dict, patterns: list[dict]) -> list[dict]:
1✔
241
        """
242
        Takes a dictionary as input and will output the dictionary on a single level.
243
        The dictionary can have lists containing other dictionaries, and one root level entry will be created for every
244
        item in a list if it corresponds to the entries of the patterns.
245
        Input:
246
        payload:
247
        `{"field1": {
248
            "field2: [
249
                {"field3: "val1", "field4": "val2"},
250
                {"field3: "val3", "field4": "val4"},
251
            }
252
        ]}`
253
        patterns:
254
        `[
255
            "field1.field2.field3": <condition>,
256
            "field1.field2.field4": <condition>,
257
        ]`
258
        Output:
259
        `[
260
            {
261
                "field1.field2.field3": "val1",
262
                "field1.field2.field4": "val2"
263
            },
264
            {
265
                "field1.field2.field3": "val3",
266
                "field1.field2.field4": "val4"
267
            },
268
        ]`
269
        :param payload: a (nested) dictionary, the event payload
270
        :param patterns: the flattened patterns from the EventPattern (see flatten_pattern)
271
        :return: flatten_dict: a dictionary with no nested dict inside, flattened to a single level
272
        """
273
        patterns_keys = {key for keys in patterns for key in keys}
1✔
274

275
        def _is_key_in_patterns(key: str) -> bool:
1✔
276
            return key is None or any(pattern_key.startswith(key) for pattern_key in patterns_keys)
1✔
277

278
        def _traverse(_object: dict, array=None, parent_key=None) -> list:
1✔
279
            if isinstance(_object, dict):
1✔
280
                for key, values in _object.items():
1✔
281
                    # We update the parent key so that {"key1": {"key2": ""}} becomes "key1.key2"
282
                    _parent_key = f"{parent_key}.{key}" if parent_key else key
1✔
283

284
                    # we make sure that we are building only the relevant parts of the payload related to the pattern
285
                    # the payload could be very complex, and the pattern only applies to part of it
286
                    if _is_key_in_patterns(_parent_key):
1✔
287
                        array = _traverse(values, array, _parent_key)
1✔
288

289
            elif isinstance(_object, list):
1✔
290
                if not _object:
1✔
291
                    return array
1✔
292
                array = [i for value in _object for i in _traverse(value, array, parent_key)]
1✔
293
            else:
294
                array = [{**item, parent_key: _object} for item in array]
1✔
295
            return array
1✔
296

297
        return _traverse(payload, array=[{}], parent_key=None)
1✔
298

299

300
class EventPatternCompiler:
1✔
301
    def __init__(self):
1✔
302
        self.error_prefix = "Event pattern is not valid. Reason: "
1✔
303

304
    def compile_event_pattern(self, event_pattern: str | dict) -> dict[str, t.Any]:
1✔
305
        if isinstance(event_pattern, str):
1✔
306
            try:
1✔
307
                event_pattern = json.loads(event_pattern)
1✔
308
                if not isinstance(event_pattern, dict):
1✔
309
                    raise InvalidEventPatternException(
1✔
310
                        f"{self.error_prefix}Filter is not an object"
311
                    )
312
            except json.JSONDecodeError:
1✔
313
                # this error message is not in parity, as it is tightly coupled to AWS parsing engine
314
                raise InvalidEventPatternException(f"{self.error_prefix}Filter is not valid JSON")
1✔
315

316
        aggregated_rules, combinations = self.aggregate_rules(event_pattern)
1✔
317

318
        for rules in aggregated_rules:
1✔
319
            for rule in rules:
1✔
320
                self._validate_rule(rule)
1✔
321

322
        return event_pattern
1✔
323

324
    def aggregate_rules(self, event_pattern: dict[str, t.Any]) -> tuple[list[list[t.Any]], int]:
1✔
325
        """
326
        This method evaluate the event pattern recursively, and returns only a list of lists of rules.
327
        It also calculates the combinations of rules, calculated depending on the nesting of the rules.
328
        Example:
329
        nested_event_pattern = {
330
            "key_a": {
331
                "key_b": {
332
                    "key_c": ["value_one", "value_two", "value_three", "value_four"]
333
                }
334
            },
335
            "key_d": {
336
                "key_e": ["value_one", "value_two", "value_three"]
337
            }
338
        }
339
        This function then iterates on the values of the top level keys of the event pattern: ("key_a", "key_d")
340
        If the iterated value is not a list, it means it is a nested property. If the scope is `MessageBody`, it is
341
        allowed, we call this method on the value, adding a level to the depth to keep track on how deep the key is.
342
        If the value is a list, it means it contains rules: we will append this list of rules in _rules, and
343
        calculate the combinations it adds.
344
        For the example event pattern containing nested properties, we calculate it this way
345
        The first array has four values in a three-level nested key, and the second has three values in a two-level
346
        nested key. 3 x 4 x 2 x 3 = 72
347
        The return value would be:
348
        [["value_one", "value_two", "value_three", "value_four"], ["value_one", "value_two", "value_three"]]
349
        It allows us to later iterate of the list of rules in an easy way, to verify its conditions only.
350

351
        :param event_pattern: a dict, starting at the Event Pattern
352
        :return: a tuple with a list of lists of rules and the calculated number of combinations
353
        """
354

355
        def _inner(
1✔
356
            pattern_elements: dict[str, t.Any], depth: int = 1, combinations: int = 1
357
        ) -> tuple[list[list[t.Any]], int]:
358
            _rules = []
1✔
359
            for key, _value in pattern_elements.items():
1✔
360
                if isinstance(_value, dict):
1✔
361
                    # From AWS docs: "unlike attribute-based policies, payload-based policies support property nesting."
362
                    sub_rules, combinations = _inner(
1✔
363
                        _value, depth=depth + 1, combinations=combinations
364
                    )
365
                    _rules.extend(sub_rules)
1✔
366
                elif isinstance(_value, list):
1✔
367
                    if not _value:
1✔
368
                        raise InvalidEventPatternException(
1✔
369
                            f"{self.error_prefix}Empty arrays are not allowed"
370
                        )
371

372
                    current_combination = 0
1✔
373
                    if key == "$or":
1✔
374
                        for val in _value:
1✔
375
                            sub_rules, or_combinations = _inner(
1✔
376
                                val, depth=depth, combinations=combinations
377
                            )
378
                            _rules.extend(sub_rules)
1✔
379
                            current_combination += or_combinations
1✔
380

381
                        combinations = current_combination
1✔
382
                    else:
383
                        _rules.append(_value)
1✔
384
                        combinations = combinations * len(_value) * depth
1✔
385
                else:
386
                    raise InvalidEventPatternException(
1✔
387
                        f'{self.error_prefix}"{key}" must be an object or an array'
388
                    )
389

390
            return _rules, combinations
1✔
391

392
        return _inner(event_pattern)
1✔
393

394
    def _validate_rule(self, rule: t.Any, from_: str | None = None) -> None:
1✔
395
        match rule:
1✔
396
            case None | str() | bool():
1✔
397
                return
1✔
398

399
            case int() | float():
1✔
400
                # TODO: AWS says they support only from -10^9 to 10^9 but seems to accept it, so we just return
401
                # if rule <= -1000000000 or rule >= 1000000000:
402
                #     raise ""
403
                return
1✔
404

405
            case {**kwargs}:
1✔
406
                if len(kwargs) != 1:
1✔
UNCOV
407
                    raise InvalidEventPatternException(
×
408
                        f"{self.error_prefix}Only one key allowed in match expression"
409
                    )
410

411
                operator, value = None, None
1✔
412
                for k, v in kwargs.items():
1✔
413
                    operator, value = k, v
1✔
414

415
                if operator in (
1✔
416
                    "prefix",
417
                    "suffix",
418
                ):
419
                    if from_ == "anything-but":
1✔
420
                        if isinstance(value, dict):
1✔
421
                            raise InvalidEventPatternException(
1✔
422
                                f"{self.error_prefix}Value of {from_} must be an array or single string/number value."
423
                            )
424

425
                        if not self._is_str_or_list_of_str(value):
1✔
426
                            raise InvalidEventPatternException(
1✔
427
                                f"{self.error_prefix}prefix/suffix match pattern must be a string"
428
                            )
429
                        elif not value:
1✔
430
                            raise InvalidEventPatternException(
1✔
431
                                f"{self.error_prefix}Null prefix/suffix not allowed"
432
                            )
433

434
                    elif isinstance(value, dict):
1✔
435
                        for inner_operator in value.keys():
1✔
436
                            if inner_operator != "equals-ignore-case":
1✔
UNCOV
437
                                raise InvalidEventPatternException(
×
438
                                    f"{self.error_prefix}Unsupported anything-but pattern: {inner_operator}"
439
                                )
440

441
                    elif not isinstance(value, str):
1✔
442
                        raise InvalidEventPatternException(
1✔
443
                            f"{self.error_prefix}{operator} match pattern must be a string"
444
                        )
445
                    return
1✔
446

447
                elif operator == "equals-ignore-case":
1✔
448
                    if from_ == "anything-but":
1✔
449
                        if not self._is_str_or_list_of_str(value):
1✔
450
                            raise InvalidEventPatternException(
1✔
451
                                f"{self.error_prefix}Inside {from_}/{operator} list, number|start|null|boolean is not supported."
452
                            )
453
                    elif not isinstance(value, str):
1✔
454
                        raise InvalidEventPatternException(
1✔
455
                            f"{self.error_prefix}{operator} match pattern must be a string"
456
                        )
457
                    return
1✔
458

459
                elif operator == "anything-but":
1✔
460
                    # anything-but can actually contain any kind of simple rule (str, number, and list)
461
                    if isinstance(value, list):
1✔
462
                        for v in value:
1✔
463
                            self._validate_rule(v)
1✔
464

465
                        return
1✔
466

467
                    # or have a nested `prefix`, `suffix` or `equals-ignore-case` pattern
468
                    elif isinstance(value, dict):
1✔
469
                        for inner_operator in value.keys():
1✔
470
                            if inner_operator not in (
1✔
471
                                "prefix",
472
                                "equals-ignore-case",
473
                                "suffix",
474
                                "wildcard",
475
                            ):
UNCOV
476
                                raise InvalidEventPatternException(
×
477
                                    f"{self.error_prefix}Unsupported anything-but pattern: {inner_operator}"
478
                                )
479

480
                    self._validate_rule(value, from_="anything-but")
1✔
481
                    return
1✔
482

483
                elif operator == "exists":
1✔
484
                    if not isinstance(value, bool):
1✔
UNCOV
485
                        raise InvalidEventPatternException(
×
486
                            f"{self.error_prefix}exists match pattern must be either true or false."
487
                        )
488
                    return
1✔
489

490
                elif operator == "numeric":
1✔
491
                    self._validate_numeric_condition(value)
1✔
492

493
                elif operator == "cidr":
1✔
494
                    self._validate_cidr_condition(value)
1✔
495

496
                elif operator == "wildcard":
1✔
497
                    if from_ == "anything-but" and isinstance(value, list):
1✔
498
                        for v in value:
1✔
499
                            self._validate_wildcard(v)
1✔
500
                    else:
501
                        self._validate_wildcard(value)
1✔
502

503
                else:
504
                    raise InvalidEventPatternException(
1✔
505
                        f"{self.error_prefix}Unrecognized match type {operator}"
506
                    )
507

UNCOV
508
            case _:
×
UNCOV
509
                raise InvalidEventPatternException(
×
510
                    f"{self.error_prefix}Match value must be String, number, true, false, or null"
511
                )
512

513
    def _validate_numeric_condition(self, value):
1✔
514
        if not isinstance(value, list):
1✔
515
            raise InvalidEventPatternException(
1✔
516
                f"{self.error_prefix}Value of numeric must be an array."
517
            )
518
        if not value:
1✔
UNCOV
519
            raise InvalidEventPatternException(
×
520
                f"{self.error_prefix}Invalid member in numeric match: ]"
521
            )
522
        num_values = value[::-1]
1✔
523

524
        operator = num_values.pop()
1✔
525
        if not isinstance(operator, str):
1✔
UNCOV
526
            raise InvalidEventPatternException(
×
527
                f"{self.error_prefix}Invalid member in numeric match: {operator}"
528
            )
529
        elif operator not in ("<", "<=", "=", ">", ">="):
1✔
530
            raise InvalidEventPatternException(
×
531
                f"{self.error_prefix}Unrecognized numeric range operator: {operator}"
532
            )
533

534
        value = num_values.pop() if num_values else None
1✔
535
        if not isinstance(value, (int, float)):
1✔
536
            exc_operator = "equals" if operator == "=" else operator
×
UNCOV
537
            raise InvalidEventPatternException(
×
538
                f"{self.error_prefix}Value of {exc_operator} must be numeric"
539
            )
540

541
        if not num_values:
1✔
542
            return
1✔
543

544
        if operator not in (">", ">="):
1✔
UNCOV
545
            raise InvalidEventPatternException(
×
546
                f"{self.error_prefix}Too many elements in numeric expression"
547
            )
548

549
        second_operator = num_values.pop()
1✔
550
        if not isinstance(second_operator, str):
1✔
UNCOV
551
            raise InvalidEventPatternException(
×
552
                f"{self.error_prefix}Bad value in numeric range: {second_operator}"
553
            )
554
        elif second_operator not in ("<", "<="):
1✔
555
            raise InvalidEventPatternException(
1✔
556
                f"{self.error_prefix}Bad numeric range operator: {second_operator}"
557
            )
558

559
        second_value = num_values.pop() if num_values else None
1✔
560
        if not isinstance(second_value, (int, float)):
1✔
561
            exc_operator = "equals" if second_operator == "=" else second_operator
1✔
562
            raise InvalidEventPatternException(
1✔
563
                f"{self.error_prefix}Value of {exc_operator} must be numeric"
564
            )
565

566
        elif second_value <= value:
1✔
UNCOV
567
            raise InvalidEventPatternException(f"{self.error_prefix}Bottom must be less than top")
×
568

569
        elif num_values:
1✔
UNCOV
570
            raise InvalidEventPatternException(
×
571
                f"{self.error_prefix}Too many terms in numeric range expression"
572
            )
573

574
    def _validate_wildcard(self, value: t.Any):
1✔
575
        if not isinstance(value, str):
1✔
576
            raise InvalidEventPatternException(
1✔
577
                f"{self.error_prefix}wildcard match pattern must be a string"
578
            )
579
        # TODO: properly calculate complexity of wildcard
580
        # https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-pattern-operators.html#eb-filtering-wildcard-matching-complexity
581
        # > calculate complexity of repeating character sequences that occur after a wildcard character
582
        if "**" in value:
1✔
583
            raise InvalidEventPatternException(
1✔
584
                f"{self.error_prefix}Consecutive wildcard characters at pos {value.index('**') + 1}"
585
            )
586

587
        if value.count("*") > 5:
1✔
588
            raise InvalidEventPatternException(
1✔
589
                f"{self.error_prefix}Rule is too complex - try using fewer wildcard characters or fewer repeating character sequences after a wildcard character"
590
            )
591

592
    def _validate_cidr_condition(self, value):
1✔
593
        if not isinstance(value, str):
1✔
594
            # `cidr` returns the prefix error
595
            raise InvalidEventPatternException(
1✔
596
                f"{self.error_prefix}prefix match pattern must be a string"
597
            )
598
        ip_and_mask = value.split("/")
1✔
599
        if len(ip_and_mask) != 2:
1✔
600
            raise InvalidEventPatternException(
1✔
601
                f"{self.error_prefix}Malformed CIDR, one '/' required"
602
            )
603
        ip_addr, mask = value.split("/")
1✔
604
        try:
1✔
605
            int(mask)
1✔
606
        except ValueError:
1✔
607
            raise InvalidEventPatternException(
1✔
608
                f"{self.error_prefix}Malformed CIDR, mask bits must be an integer"
609
            )
610
        try:
1✔
611
            ipaddress.ip_network(value)
1✔
612
        except ValueError:
1✔
613
            raise InvalidEventPatternException(
1✔
614
                f"{self.error_prefix}Nonstandard IP address: {ip_addr}"
615
            )
616

617
    @staticmethod
1✔
618
    def _is_str_or_list_of_str(value: t.Any) -> bool:
1✔
619
        if not isinstance(value, (str, list)):
1✔
620
            return False
1✔
621
        if isinstance(value, list) and not all(isinstance(v, str) for v in value):
1✔
622
            return False
1✔
623

624
        return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc