• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LeanderCS / flask-inputfilter / #434

11 Aug 2025 10:38AM UTC coverage: 92.953% (-1.8%) from 94.8%
#434

push

coveralls-python

LeanderCS
Optimze performance for heavy processing

39 of 86 new or added lines in 9 files covered. (45.35%)

3 existing lines in 3 files now uncovered.

2005 of 2157 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.13
/flask_inputfilter/filters/lazy_filter_chain.py
1
from __future__ import annotations
1✔
2

3
from typing import TYPE_CHECKING, Any, Generator
1✔
4

5
if TYPE_CHECKING:
1✔
NEW
6
    from flask_inputfilter.models import BaseFilter
×
7

8

9
class LazyFilterChain:
1✔
10
    """
11
    A lazy evaluation wrapper for filter chains that delays processing until
12
    needed.
13

14
    This can significantly improve performance when dealing with large filter
15
    chains or when early filters might invalidate the need for later
16
    processing.
17
    """
18

19
    __slots__ = ("_filters", "_processed", "_result", "_value")
1✔
20

21
    def __init__(self, filters: list[BaseFilter], value: Any) -> None:
1✔
NEW
22
        self._filters = filters
×
NEW
23
        self._value = value
×
NEW
24
        self._processed = False
×
NEW
25
        self._result = None
×
26

27
    def _process(self) -> Generator[Any, None, None]:
1✔
28
        """Generator that lazily applies filters."""
NEW
29
        current_value = self._value
×
NEW
30
        for filter in self._filters:
×
NEW
31
            if current_value is None:
×
NEW
32
                yield None
×
NEW
33
                return
×
NEW
34
            current_value = filter.apply(current_value)
×
NEW
35
            yield current_value
×
36

37
    def get_result(self) -> Any:
1✔
38
        """Get the final result, processing filters only when needed."""
NEW
39
        if not self._processed:
×
40
            # Process all filters and get the final result
NEW
41
            for result in self._process():
×
NEW
42
                self._result = result
×
NEW
43
            self._processed = True
×
NEW
44
        return self._result
×
45

46
    def apply_until(self, condition: callable) -> Any:
1✔
47
        """
48
        Apply filters until a condition is met.
49

50
        This allows for early termination of filter processing.
51
        """
NEW
52
        current_value = self._value
×
NEW
53
        for filter in self._filters:
×
NEW
54
            if current_value is None or condition(current_value):
×
NEW
55
                return current_value
×
NEW
56
            current_value = filter.apply(current_value)
×
NEW
57
        return current_value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc