• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 106fdc6c-643c-4fdd-90db-acf252955759

25 Mar 2025 06:43PM UTC coverage: 86.862% (-0.02%) from 86.88%
106fdc6c-643c-4fdd-90db-acf252955759

push

circleci

web-flow
[Utils] Add a batch policy utility (#12430)

53 of 56 new or added lines in 1 file covered. (94.64%)

17 existing lines in 11 files now uncovered.

63238 of 72803 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.64
/localstack-core/localstack/utils/batch_policy.py
1
import copy
1✔
2
import time
1✔
3
from typing import Generic, List, Optional, TypeVar, overload
1✔
4

5
from pydantic import Field
1✔
6
from pydantic.dataclasses import dataclass
1✔
7

8
T = TypeVar("T")
1✔
9

10
# alias to signify whether a batch policy has been triggered
11
BatchPolicyTriggered = bool
1✔
12

13

14
# TODO: Add batching on bytes as well.
15
@dataclass
1✔
16
class Batcher(Generic[T]):
1✔
17
    """
18
    A utility for collecting items into batches and flushing them when one or more batch policy conditions are met.
19

20
    The batch policy can be created to trigger on:
21
    - max_count: Maximum number of items added
22
    - max_window: Maximum time window (in seconds)
23

24
    If no limits are specified, the batcher is always in triggered state.
25

26
    Example usage:
27

28
        import time
29

30
        # Triggers when 2 (or more) items are added
31
        batcher = Batcher(max_count=2)
32
        assert batcher.add(["item1", "item2", "item3"])
33
        assert batcher.flush() == ["item1", "item2", "item3"]
34

35
        # Triggers partially when 2 (or more) items are added
36
        batcher = Batcher(max_count=2)
37
        assert batcher.add(["item1", "item2", "item3"])
38
        assert batcher.flush(partial=True) == ["item1", "item2"]
39
        assert batcher.add("item4")
40
        assert batcher.flush(partial=True) == ["item3", "item4"]
41

42
        # Trigger 2 seconds after the first add
43
        batcher = Batcher(max_window=2.0)
44
        assert not batcher.add(["item1", "item2", "item3"])
45
        time.sleep(2.1)
46
        assert not batcher.add(["item4"])
47
        assert batcher.flush() == ["item1", "item2", "item3", "item4"]
48
    """
49

50
    max_count: Optional[int] = Field(default=None, description="Maximum number of items", ge=0)
1✔
51
    max_window: Optional[float] = Field(
1✔
52
        default=None, description="Maximum time window in seconds", ge=0
53
    )
54

55
    _triggered: bool = Field(default=False, init=False)
1✔
56
    _last_batch_time: float = Field(default_factory=time.monotonic, init=False)
1✔
57
    _batch: list[T] = Field(default_factory=list, init=False)
1✔
58

59
    @property
1✔
60
    def period(self) -> float:
1✔
61
        return time.monotonic() - self._last_batch_time
1✔
62

63
    def _check_batch_policy(self) -> bool:
1✔
64
        """Check if any batch policy conditions are met"""
65
        if self.max_count is not None and len(self._batch) >= self.max_count:
1✔
66
            self._triggered = True
1✔
67
        elif self.max_window is not None and self.period >= self.max_window:
1✔
68
            self._triggered = True
1✔
69
        elif not self.max_count and not self.max_window:
1✔
70
            # always return true
71
            self._triggered = True
1✔
72

73
        return self._triggered
1✔
74

75
    @overload
1✔
76
    def add(self, item: T, *, deep_copy: bool = False) -> BatchPolicyTriggered: ...
1✔
77

78
    @overload
1✔
79
    def add(self, items: List[T], *, deep_copy: bool = False) -> BatchPolicyTriggered: ...
1✔
80

81
    def add(self, item_or_items: T | list[T], *, deep_copy: bool = False) -> BatchPolicyTriggered:
1✔
82
        """
83
        Add an item or list of items to the collected batch.
84

85
        Returns:
86
            BatchPolicyTriggered: True if the batch policy was triggered during addition, False otherwise.
87
        """
88
        if deep_copy:
1✔
89
            item_or_items = copy.deepcopy(item_or_items)
1✔
90

91
        if isinstance(item_or_items, list):
1✔
92
            self._batch.extend(item_or_items)
1✔
93
        else:
94
            self._batch.append(item_or_items)
1✔
95

96
        # Check if the last addition triggered the batch policy
97
        return self.is_triggered()
1✔
98

99
    def flush(self, *, partial=False) -> list[T]:
1✔
100
        result = []
1✔
101
        if not partial or not self.max_count:
1✔
102
            result = self._batch.copy()
1✔
103
            self._batch.clear()
1✔
104
        else:
105
            batch_size = min(self.max_count, len(self._batch))
1✔
106
            result = self._batch[:batch_size].copy()
1✔
107
            self._batch = self._batch[batch_size:]
1✔
108

109
        self._last_batch_time = time.monotonic()
1✔
110
        self._triggered = False
1✔
111
        self._check_batch_policy()
1✔
112

113
        return result
1✔
114

115
    def duration_until_next_batch(self) -> float:
1✔
NEW
116
        if not self.max_window:
×
NEW
117
            return -1
×
NEW
118
        return max(self.max_window - self.period, -1)
×
119

120
    def get_current_size(self) -> int:
1✔
121
        return len(self._batch)
1✔
122

123
    def is_triggered(self):
1✔
124
        return self._triggered or self._check_batch_policy()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc