• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22291461973

21 Feb 2026 09:33AM UTC coverage: 86.973% (+0.009%) from 86.964%
22291461973

push

github

web-flow
fix typo in Dockerfile (#13812)

69789 of 80242 relevant lines covered (86.97%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
/localstack-core/localstack/services/lambda_/invocation/counting_service.py
1
import contextlib
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from collections.abc import Iterator
1✔
5
from threading import RLock
1✔
6

7
from localstack import config
1✔
8
from localstack.aws.api.lambda_ import ProvisionedConcurrencyStatusEnum, TooManyRequestsException
1✔
9
from localstack.services.lambda_.invocation.lambda_models import (
1✔
10
    Function,
11
    FunctionVersion,
12
    InitializationType,
13
    ProvisionedConcurrencyState,
14
)
15
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
16

17
LOG = logging.getLogger(__name__)
1✔
18

19

20
class ConcurrencyTracker:
1✔
21
    """Keeps track of the number of concurrent executions per lock scope (e.g., per function or function version).
22
    The lock scope depends on the provisioning type (i.e., on-demand or provisioned):
23
    * on-demand concurrency per function: unqualified arn ending with my-function
24
    * provisioned concurrency per function version: qualified arn ending with my-function:1
25
    """
26

27
    # Lock scope => concurrent executions counter
28
    concurrent_executions: dict[str, int]
1✔
29
    # Lock for safely updating the concurrent executions counter
30
    lock: RLock
1✔
31

32
    def __init__(self):
1✔
33
        self.concurrent_executions = defaultdict(int)
1✔
34
        self.lock = RLock()
1✔
35

36
    def increment(self, scope: str) -> None:
1✔
37
        self.concurrent_executions[scope] += 1
1✔
38

39
    def atomic_decrement(self, scope: str):
1✔
40
        with self.lock:
1✔
41
            self.decrement(scope)
1✔
42

43
    def decrement(self, scope: str) -> None:
1✔
44
        self.concurrent_executions[scope] -= 1
1✔
45

46

47
def calculate_provisioned_concurrency_sum(function: Function) -> int:
1✔
48
    """Returns the total provisioned concurrency for a given function, including all versions."""
49
    provisioned_concurrency_sum_for_fn = sum(
1✔
50
        [
51
            provisioned_configs.provisioned_concurrent_executions
52
            for provisioned_configs in function.provisioned_concurrency_configs.values()
53
        ]
54
    )
55
    return provisioned_concurrency_sum_for_fn
1✔
56

57

58
class CountingService:
1✔
59
    """
60
    The CountingService enforces quota limits per region and account in get_invocation_lease()
61
    for every Lambda invocation. It uses separate ConcurrencyTrackers for on-demand and provisioned concurrency
62
    to keep track of the number of concurrent invocations.
63

64
    Concurrency limits are per region and account:
65
    https://repost.aws/knowledge-center/lambda-concurrency-limit-increase
66
    https://docs.aws.amazon.com/lambda/latest/dg/lambda-concurrency.htm
67
    https://docs.aws.amazon.com/lambda/latest/dg/monitoring-concurrency.html
68
    """
69

70
    # (account, region) => ConcurrencyTracker (unqualified arn) => concurrent executions
71
    on_demand_concurrency_trackers: dict[tuple[str, str], ConcurrencyTracker]
1✔
72
    # Lock for safely initializing new on-demand concurrency trackers
73
    on_demand_init_lock: RLock
1✔
74

75
    # (account, region) => ConcurrencyTracker (qualified arn) => concurrent executions
76
    provisioned_concurrency_trackers: dict[tuple[str, str], ConcurrencyTracker]
1✔
77
    # Lock for safely initializing new provisioned concurrency trackers
78
    provisioned_concurrency_init_lock: RLock
1✔
79

80
    def __init__(self):
1✔
81
        self.on_demand_concurrency_trackers = {}
1✔
82
        self.on_demand_init_lock = RLock()
1✔
83
        self.provisioned_concurrency_trackers = {}
1✔
84
        self.provisioned_concurrency_init_lock = RLock()
1✔
85

86
    @contextlib.contextmanager
1✔
87
    def get_invocation_lease(
1✔
88
        self,
89
        function: Function | None,
90
        function_version: FunctionVersion,
91
        provisioned_state: ProvisionedConcurrencyState | None = None,
92
    ) -> Iterator[InitializationType]:
93
        """An invocation lease reserves the right to schedule an invocation.
94
        The returned lease type can either be on-demand or provisioned.
95
        Scheduling preference:
96
        1) Check for free provisioned concurrency => provisioned
97
        2) Check for reserved concurrency => on-demand
98
        3) Check for unreserved concurrency => on-demand
99

100
        HACK: We allow the function to be None for Lambda@Edge to skip provisioned and reserved concurrency.
101
        """
102
        account = function_version.id.account
1✔
103
        region = function_version.id.region
1✔
104
        scope_tuple = (account, region)
1✔
105
        on_demand_tracker = self.on_demand_concurrency_trackers.get(scope_tuple)
1✔
106
        # Double-checked locking pattern to initialize an on-demand concurrency tracker if it does not exist
107
        if not on_demand_tracker:
1✔
108
            with self.on_demand_init_lock:
1✔
109
                on_demand_tracker = self.on_demand_concurrency_trackers.get(scope_tuple)
1✔
110
                if not on_demand_tracker:
1✔
111
                    on_demand_tracker = self.on_demand_concurrency_trackers[scope_tuple] = (
1✔
112
                        ConcurrencyTracker()
113
                    )
114

115
        provisioned_tracker = self.provisioned_concurrency_trackers.get(scope_tuple)
1✔
116
        # Double-checked locking pattern to initialize a provisioned concurrency tracker if it does not exist
117
        if not provisioned_tracker:
1✔
118
            with self.provisioned_concurrency_init_lock:
1✔
119
                provisioned_tracker = self.provisioned_concurrency_trackers.get(scope_tuple)
1✔
120
                if not provisioned_tracker:
1✔
121
                    provisioned_tracker = self.provisioned_concurrency_trackers[scope_tuple] = (
1✔
122
                        ConcurrencyTracker()
123
                    )
124

125
        unqualified_function_arn = function_version.id.unqualified_arn()
1✔
126
        qualified_arn = function_version.id.qualified_arn()
1✔
127

128
        lease_type = None
1✔
129
        # HACK: skip reserved and provisioned concurrency if function not available (e.g., in Lambda@Edge)
130
        if function is not None:
1✔
131
            with provisioned_tracker.lock:
1✔
132
                # 1) Check for free provisioned concurrency
133
                provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
134
                    function_version.id.qualifier
135
                )
136
                if not provisioned_concurrency_config:
1✔
137
                    # check if any aliases point to the current version, and check the provisioned concurrency config
138
                    # for them. There can be only one config for a version, not matter if defined on the alias or version itself.
139
                    for alias in function.aliases.values():
1✔
140
                        if alias.function_version == function_version.id.qualifier:
1✔
141
                            provisioned_concurrency_config = (
1✔
142
                                function.provisioned_concurrency_configs.get(alias.name)
143
                            )
144
                            break
1✔
145
                # Favor provisioned concurrency if configured and ready
146
                # TODO: test updating provisioned concurrency? Does AWS serve on-demand during updates?
147
                # Potential challenge if an update happens in between reserving the lease here and actually assigning
148
                # * Increase provisioned: It could happen that we give a lease for provisioned-concurrency although
149
                # brand new provisioned environments are not yet initialized.
150
                # * Decrease provisioned: It could happen that we have running invocations that should still be counted
151
                # against the limit but they are not because we already updated the concurrency config to fewer envs.
152
                if (
1✔
153
                    provisioned_concurrency_config
154
                    and provisioned_state.status == ProvisionedConcurrencyStatusEnum.READY
155
                ):
156
                    available_provisioned_concurrency = (
1✔
157
                        provisioned_concurrency_config.provisioned_concurrent_executions
158
                        - provisioned_tracker.concurrent_executions[qualified_arn]
159
                    )
160
                    if available_provisioned_concurrency > 0:
1✔
161
                        provisioned_tracker.increment(qualified_arn)
1✔
162
                        lease_type = InitializationType.provisioned_concurrency
1✔
163

164
        if not lease_type:
1✔
165
            with on_demand_tracker.lock:
1✔
166
                # 2) If reserved concurrency is set AND no provisioned concurrency available:
167
                # => Check if enough reserved concurrency is available for the specific function.
168
                # HACK: skip reserved if function not available (e.g., in Lambda@Edge)
169
                if function and function.reserved_concurrent_executions is not None:
1✔
170
                    on_demand_running_invocation_count = on_demand_tracker.concurrent_executions[
1✔
171
                        unqualified_function_arn
172
                    ]
173
                    available_reserved_concurrency = (
1✔
174
                        function.reserved_concurrent_executions
175
                        - calculate_provisioned_concurrency_sum(function)
176
                        - on_demand_running_invocation_count
177
                    )
178
                    if available_reserved_concurrency > 0:
1✔
179
                        on_demand_tracker.increment(unqualified_function_arn)
1✔
180
                        lease_type = InitializationType.on_demand
1✔
181
                    else:
182
                        extras = {
1✔
183
                            "available_reserved_concurrency": available_reserved_concurrency,
184
                            "reserved_concurrent_executions": function.reserved_concurrent_executions,
185
                            "provisioned_concurrency_sum": calculate_provisioned_concurrency_sum(
186
                                function
187
                            ),
188
                            "on_demand_running_invocation_count": on_demand_running_invocation_count,
189
                        }
190
                        LOG.debug("Insufficient reserved concurrency available: %s", extras)
1✔
191
                        raise TooManyRequestsException(
1✔
192
                            "Rate Exceeded.",
193
                            Reason="ReservedFunctionConcurrentInvocationLimitExceeded",
194
                            Type="User",
195
                        )
196
                # 3) If no reserved concurrency is set AND no provisioned concurrency available.
197
                # => Check the entire state within the scope of account and region.
198
                else:
199
                    # TODO: Consider a dedicated counter for unavailable concurrency with locks for updates on
200
                    #  reserved and provisioned concurrency if this is too slow
201
                    # The total concurrency allocated or used (i.e., unavailable concurrency) per account and region
202
                    total_used_concurrency = 0
1✔
203
                    store = lambda_stores[account][region]
1✔
204
                    for fn in store.functions.values():
1✔
205
                        if fn.reserved_concurrent_executions is not None:
1✔
206
                            total_used_concurrency += fn.reserved_concurrent_executions
×
207
                        else:
208
                            fn_provisioned_concurrency = calculate_provisioned_concurrency_sum(fn)
1✔
209
                            total_used_concurrency += fn_provisioned_concurrency
1✔
210
                            fn_on_demand_concurrent_executions = (
1✔
211
                                on_demand_tracker.concurrent_executions[
212
                                    fn.latest().id.unqualified_arn()
213
                                ]
214
                            )
215
                            total_used_concurrency += fn_on_demand_concurrent_executions
1✔
216

217
                    available_unreserved_concurrency = (
1✔
218
                        config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS - total_used_concurrency
219
                    )
220
                    if available_unreserved_concurrency > 0:
1✔
221
                        on_demand_tracker.increment(unqualified_function_arn)
1✔
222
                        lease_type = InitializationType.on_demand
1✔
223
                    else:
224
                        if available_unreserved_concurrency < 0:
×
225
                            LOG.error(
×
226
                                "Invalid function concurrency state detected for function: %s | available unreserved concurrency: %d",
227
                                unqualified_function_arn,
228
                                available_unreserved_concurrency,
229
                            )
230
                        extras = {
×
231
                            "available_unreserved_concurrency": available_unreserved_concurrency,
232
                            "lambda_limits_concurrent_executions": config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
233
                            "total_used_concurrency": total_used_concurrency,
234
                        }
235
                        LOG.debug("Insufficient unreserved concurrency available: %s", extras)
×
236
                        raise TooManyRequestsException(
×
237
                            "Rate Exceeded.",
238
                            Reason="ReservedFunctionConcurrentInvocationLimitExceeded",
239
                            Type="User",
240
                        )
241
        try:
1✔
242
            yield lease_type
1✔
243
        finally:
244
            if lease_type == InitializationType.provisioned_concurrency:
1✔
245
                provisioned_tracker.atomic_decrement(qualified_arn)
1✔
246
            elif lease_type == InitializationType.on_demand:
1✔
247
                on_demand_tracker.atomic_decrement(unqualified_function_arn)
1✔
248
            else:
249
                LOG.error(
×
250
                    "Invalid lease type detected for function: %s: %s",
251
                    unqualified_function_arn,
252
                    lease_type,
253
                )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc