• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 858585c6-6e27-4bca-a3b8-78d6d978f737

25 Mar 2025 06:43PM UTC coverage: 86.88% (+0.02%) from 86.865%
858585c6-6e27-4bca-a3b8-78d6d978f737

push

circleci

web-flow
[Utils] Add a batch policy utility (#12430)

53 of 56 new or added lines in 1 file covered. (94.64%)

227 existing lines in 12 files now uncovered.

63251 of 72803 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.85
/localstack-core/localstack/services/lambda_/invocation/counting_service.py
1
import contextlib
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from threading import RLock
1✔
5

6
from localstack import config
1✔
7
from localstack.aws.api.lambda_ import TooManyRequestsException
1✔
8
from localstack.services.lambda_.invocation.lambda_models import (
1✔
9
    Function,
10
    FunctionVersion,
11
    InitializationType,
12
)
13
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
14
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
1✔
15
    is_lambda_debug_enabled_for,
16
)
17

18
LOG = logging.getLogger(__name__)
1✔
19

20

21
class ConcurrencyTracker:
1✔
22
    """Keeps track of the number of concurrent executions per lock scope (e.g., per function or function version).
23
    The lock scope depends on the provisioning type (i.e., on-demand or provisioned):
24
    * on-demand concurrency per function: unqualified arn ending with my-function
25
    * provisioned concurrency per function version: qualified arn ending with my-function:1
26
    """
27

28
    # Lock scope => concurrent executions counter
29
    concurrent_executions: dict[str, int]
1✔
30
    # Lock for safely updating the concurrent executions counter
31
    lock: RLock
1✔
32

33
    def __init__(self):
1✔
34
        self.concurrent_executions = defaultdict(int)
1✔
35
        self.lock = RLock()
1✔
36

37
    def increment(self, scope: str) -> None:
1✔
38
        self.concurrent_executions[scope] += 1
1✔
39

40
    def atomic_decrement(self, scope: str):
1✔
41
        with self.lock:
1✔
42
            self.decrement(scope)
1✔
43

44
    def decrement(self, scope: str) -> None:
1✔
45
        self.concurrent_executions[scope] -= 1
1✔
46

47

48
def calculate_provisioned_concurrency_sum(function: Function) -> int:
1✔
49
    """Returns the total provisioned concurrency for a given function, including all versions."""
50
    provisioned_concurrency_sum_for_fn = sum(
1✔
51
        [
52
            provisioned_configs.provisioned_concurrent_executions
53
            for provisioned_configs in function.provisioned_concurrency_configs.values()
54
        ]
55
    )
56
    return provisioned_concurrency_sum_for_fn
1✔
57

58

59
class CountingService:
1✔
60
    """
61
    The CountingService enforces quota limits per region and account in get_invocation_lease()
62
    for every Lambda invocation. It uses separate ConcurrencyTrackers for on-demand and provisioned concurrency
63
    to keep track of the number of concurrent invocations.
64

65
    Concurrency limits are per region and account:
66
    https://repost.aws/knowledge-center/lambda-concurrency-limit-increase
67
    https://docs.aws.amazon.com/lambda/latest/dg/lambda-concurrency.htm
68
    https://docs.aws.amazon.com/lambda/latest/dg/monitoring-concurrency.html
69
    """
70

71
    # (account, region) => ConcurrencyTracker (unqualified arn) => concurrent executions
72
    on_demand_concurrency_trackers: dict[(str, str), ConcurrencyTracker]
1✔
73
    # Lock for safely initializing new on-demand concurrency trackers
74
    on_demand_init_lock: RLock
1✔
75

76
    # (account, region) => ConcurrencyTracker (qualified arn) => concurrent executions
77
    provisioned_concurrency_trackers: dict[(str, str), ConcurrencyTracker]
1✔
78
    # Lock for safely initializing new provisioned concurrency trackers
79
    provisioned_concurrency_init_lock: RLock
1✔
80

81
    def __init__(self):
1✔
82
        self.on_demand_concurrency_trackers = {}
1✔
83
        self.on_demand_init_lock = RLock()
1✔
84
        self.provisioned_concurrency_trackers = {}
1✔
85
        self.provisioned_concurrency_init_lock = RLock()
1✔
86

87
    @contextlib.contextmanager
1✔
88
    def get_invocation_lease(
1✔
89
        self, function: Function | None, function_version: FunctionVersion
90
    ) -> InitializationType:
91
        """An invocation lease reserves the right to schedule an invocation.
92
        The returned lease type can either be on-demand or provisioned.
93
        Scheduling preference:
94
        1) Check for free provisioned concurrency => provisioned
95
        2) Check for reserved concurrency => on-demand
96
        3) Check for unreserved concurrency => on-demand
97

98
        HACK: We allow the function to be None for Lambda@Edge to skip provisioned and reserved concurrency.
99
        """
100
        account = function_version.id.account
1✔
101
        region = function_version.id.region
1✔
102
        scope_tuple = (account, region)
1✔
103
        on_demand_tracker = self.on_demand_concurrency_trackers.get(scope_tuple)
1✔
104
        # Double-checked locking pattern to initialize an on-demand concurrency tracker if it does not exist
105
        if not on_demand_tracker:
1✔
106
            with self.on_demand_init_lock:
1✔
107
                on_demand_tracker = self.on_demand_concurrency_trackers.get(scope_tuple)
1✔
108
                if not on_demand_tracker:
1✔
109
                    on_demand_tracker = self.on_demand_concurrency_trackers[scope_tuple] = (
1✔
110
                        ConcurrencyTracker()
111
                    )
112

113
        provisioned_tracker = self.provisioned_concurrency_trackers.get(scope_tuple)
1✔
114
        # Double-checked locking pattern to initialize a provisioned concurrency tracker if it does not exist
115
        if not provisioned_tracker:
1✔
116
            with self.provisioned_concurrency_init_lock:
1✔
117
                provisioned_tracker = self.provisioned_concurrency_trackers.get(scope_tuple)
1✔
118
                if not provisioned_tracker:
1✔
119
                    provisioned_tracker = self.provisioned_concurrency_trackers[scope_tuple] = (
1✔
120
                        ConcurrencyTracker()
121
                    )
122

123
        # TODO: check that we don't give a lease while updating provisioned concurrency
124
        # Potential challenge if an update happens in between reserving the lease here and actually assigning
125
        # * Increase provisioned: It could happen that we give a lease for provisioned-concurrency although
126
        # brand new provisioned environments are not yet initialized.
127
        # * Decrease provisioned: It could happen that we have running invocations that should still be counted
128
        # against the limit but they are not because we already updated the concurrency config to fewer envs.
129

130
        unqualified_function_arn = function_version.id.unqualified_arn()
1✔
131
        qualified_arn = function_version.id.qualified_arn()
1✔
132

133
        # Enforce one lease per ARN if the global flag is set
134
        if is_lambda_debug_enabled_for(qualified_arn):
1✔
UNCOV
135
            with provisioned_tracker.lock, on_demand_tracker.lock:
×
UNCOV
136
                on_demand_executions: int = on_demand_tracker.concurrent_executions[
×
137
                    unqualified_function_arn
138
                ]
139
                provisioned_executions = provisioned_tracker.concurrent_executions[qualified_arn]
×
UNCOV
140
                if on_demand_executions or provisioned_executions:
×
UNCOV
141
                    LOG.warning(
×
142
                        "Concurrent lambda invocations disabled for '%s' by Lambda Debug Mode",
143
                        qualified_arn,
144
                    )
UNCOV
145
                    raise TooManyRequestsException(
×
146
                        "Rate Exceeded.",
147
                        Reason="SingleLeaseEnforcement",
148
                        Type="User",
149
                    )
150

151
        lease_type = None
1✔
152
        # HACK: skip reserved and provisioned concurrency if function not available (e.g., in Lambda@Edge)
153
        if function is not None:
1✔
154
            with provisioned_tracker.lock:
1✔
155
                # 1) Check for free provisioned concurrency
156
                provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
157
                    function_version.id.qualifier
158
                )
159
                if provisioned_concurrency_config:
1✔
160
                    available_provisioned_concurrency = (
1✔
161
                        provisioned_concurrency_config.provisioned_concurrent_executions
162
                        - provisioned_tracker.concurrent_executions[qualified_arn]
163
                    )
164
                    if available_provisioned_concurrency > 0:
1✔
165
                        provisioned_tracker.increment(qualified_arn)
1✔
166
                        lease_type = "provisioned-concurrency"
1✔
167

168
        if not lease_type:
1✔
169
            with on_demand_tracker.lock:
1✔
170
                # 2) If reserved concurrency is set AND no provisioned concurrency available:
171
                # => Check if enough reserved concurrency is available for the specific function.
172
                # HACK: skip reserved if function not available (e.g., in Lambda@Edge)
173
                if function and function.reserved_concurrent_executions is not None:
1✔
174
                    on_demand_running_invocation_count = on_demand_tracker.concurrent_executions[
1✔
175
                        unqualified_function_arn
176
                    ]
177
                    available_reserved_concurrency = (
1✔
178
                        function.reserved_concurrent_executions
179
                        - calculate_provisioned_concurrency_sum(function)
180
                        - on_demand_running_invocation_count
181
                    )
182
                    if available_reserved_concurrency > 0:
1✔
183
                        on_demand_tracker.increment(unqualified_function_arn)
1✔
184
                        lease_type = "on-demand"
1✔
185
                    else:
186
                        extras = {
1✔
187
                            "available_reserved_concurrency": available_reserved_concurrency,
188
                            "reserved_concurrent_executions": function.reserved_concurrent_executions,
189
                            "provisioned_concurrency_sum": calculate_provisioned_concurrency_sum(
190
                                function
191
                            ),
192
                            "on_demand_running_invocation_count": on_demand_running_invocation_count,
193
                        }
194
                        LOG.debug("Insufficient reserved concurrency available: %s", extras)
1✔
195
                        raise TooManyRequestsException(
1✔
196
                            "Rate Exceeded.",
197
                            Reason="ReservedFunctionConcurrentInvocationLimitExceeded",
198
                            Type="User",
199
                        )
200
                # 3) If no reserved concurrency is set AND no provisioned concurrency available.
201
                # => Check the entire state within the scope of account and region.
202
                else:
203
                    # TODO: Consider a dedicated counter for unavailable concurrency with locks for updates on
204
                    #  reserved and provisioned concurrency if this is too slow
205
                    # The total concurrency allocated or used (i.e., unavailable concurrency) per account and region
206
                    total_used_concurrency = 0
1✔
207
                    store = lambda_stores[account][region]
1✔
208
                    for fn in store.functions.values():
1✔
209
                        if fn.reserved_concurrent_executions is not None:
1✔
UNCOV
210
                            total_used_concurrency += fn.reserved_concurrent_executions
×
211
                        else:
212
                            fn_provisioned_concurrency = calculate_provisioned_concurrency_sum(fn)
1✔
213
                            total_used_concurrency += fn_provisioned_concurrency
1✔
214
                            fn_on_demand_concurrent_executions = (
1✔
215
                                on_demand_tracker.concurrent_executions[
216
                                    fn.latest().id.unqualified_arn()
217
                                ]
218
                            )
219
                            total_used_concurrency += fn_on_demand_concurrent_executions
1✔
220

221
                    available_unreserved_concurrency = (
1✔
222
                        config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS - total_used_concurrency
223
                    )
224
                    if available_unreserved_concurrency > 0:
1✔
225
                        on_demand_tracker.increment(unqualified_function_arn)
1✔
226
                        lease_type = "on-demand"
1✔
227
                    else:
UNCOV
228
                        if available_unreserved_concurrency < 0:
×
229
                            LOG.error(
×
230
                                "Invalid function concurrency state detected for function: %s | available unreserved concurrency: %d",
231
                                unqualified_function_arn,
232
                                available_unreserved_concurrency,
233
                            )
234
                        extras = {
×
235
                            "available_unreserved_concurrency": available_unreserved_concurrency,
236
                            "lambda_limits_concurrent_executions": config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
237
                            "total_used_concurrency": total_used_concurrency,
238
                        }
UNCOV
239
                        LOG.debug("Insufficient unreserved concurrency available: %s", extras)
×
UNCOV
240
                        raise TooManyRequestsException(
×
241
                            "Rate Exceeded.",
242
                            Reason="ReservedFunctionConcurrentInvocationLimitExceeded",
243
                            Type="User",
244
                        )
245
        try:
1✔
246
            yield lease_type
1✔
247
        finally:
248
            if lease_type == "provisioned-concurrency":
1✔
249
                provisioned_tracker.atomic_decrement(qualified_arn)
1✔
250
            elif lease_type == "on-demand":
1✔
251
                on_demand_tracker.atomic_decrement(unqualified_function_arn)
1✔
252
            else:
UNCOV
253
                LOG.error(
×
254
                    "Invalid lease type detected for function: %s: %s",
255
                    unqualified_function_arn,
256
                    lease_type,
257
                )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc