• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20942662173

12 Jan 2026 04:45PM UTC coverage: 86.905% (-0.03%) from 86.936%
20942662173

push

github

web-flow
Allow authenticated pull and push of docker images (#13569)

34 of 51 new or added lines in 4 files covered. (66.67%)

247 existing lines in 15 files now uncovered.

70218 of 80799 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
/localstack-core/localstack/services/lambda_/invocation/counting_service.py
1
import contextlib
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from collections.abc import Iterator
1✔
5
from threading import RLock
1✔
6

7
from localstack import config
1✔
8
from localstack.aws.api.lambda_ import TooManyRequestsException
1✔
9
from localstack.services.lambda_.invocation.lambda_models import (
1✔
10
    Function,
11
    FunctionVersion,
12
    InitializationType,
13
)
14
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
15

16
LOG = logging.getLogger(__name__)
1✔
17

18

19
class ConcurrencyTracker:
1✔
20
    """Keeps track of the number of concurrent executions per lock scope (e.g., per function or function version).
21
    The lock scope depends on the provisioning type (i.e., on-demand or provisioned):
22
    * on-demand concurrency per function: unqualified arn ending with my-function
23
    * provisioned concurrency per function version: qualified arn ending with my-function:1
24
    """
25

26
    # Lock scope => concurrent executions counter
27
    concurrent_executions: dict[str, int]
1✔
28
    # Lock for safely updating the concurrent executions counter
29
    lock: RLock
1✔
30

31
    def __init__(self):
1✔
32
        self.concurrent_executions = defaultdict(int)
1✔
33
        self.lock = RLock()
1✔
34

35
    def increment(self, scope: str) -> None:
1✔
36
        self.concurrent_executions[scope] += 1
1✔
37

38
    def atomic_decrement(self, scope: str):
1✔
39
        with self.lock:
1✔
40
            self.decrement(scope)
1✔
41

42
    def decrement(self, scope: str) -> None:
1✔
43
        self.concurrent_executions[scope] -= 1
1✔
44

45

46
def calculate_provisioned_concurrency_sum(function: Function) -> int:
1✔
47
    """Returns the total provisioned concurrency for a given function, including all versions."""
48
    provisioned_concurrency_sum_for_fn = sum(
1✔
49
        [
50
            provisioned_configs.provisioned_concurrent_executions
51
            for provisioned_configs in function.provisioned_concurrency_configs.values()
52
        ]
53
    )
54
    return provisioned_concurrency_sum_for_fn
1✔
55

56

57
class CountingService:
1✔
58
    """
59
    The CountingService enforces quota limits per region and account in get_invocation_lease()
60
    for every Lambda invocation. It uses separate ConcurrencyTrackers for on-demand and provisioned concurrency
61
    to keep track of the number of concurrent invocations.
62

63
    Concurrency limits are per region and account:
64
    https://repost.aws/knowledge-center/lambda-concurrency-limit-increase
65
    https://docs.aws.amazon.com/lambda/latest/dg/lambda-concurrency.htm
66
    https://docs.aws.amazon.com/lambda/latest/dg/monitoring-concurrency.html
67
    """
68

69
    # (account, region) => ConcurrencyTracker (unqualified arn) => concurrent executions
70
    on_demand_concurrency_trackers: dict[tuple[str, str], ConcurrencyTracker]
1✔
71
    # Lock for safely initializing new on-demand concurrency trackers
72
    on_demand_init_lock: RLock
1✔
73

74
    # (account, region) => ConcurrencyTracker (qualified arn) => concurrent executions
75
    provisioned_concurrency_trackers: dict[tuple[str, str], ConcurrencyTracker]
1✔
76
    # Lock for safely initializing new provisioned concurrency trackers
77
    provisioned_concurrency_init_lock: RLock
1✔
78

79
    def __init__(self):
1✔
80
        self.on_demand_concurrency_trackers = {}
1✔
81
        self.on_demand_init_lock = RLock()
1✔
82
        self.provisioned_concurrency_trackers = {}
1✔
83
        self.provisioned_concurrency_init_lock = RLock()
1✔
84

85
    @contextlib.contextmanager
1✔
86
    def get_invocation_lease(
1✔
87
        self, function: Function | None, function_version: FunctionVersion
88
    ) -> Iterator[InitializationType]:
89
        """An invocation lease reserves the right to schedule an invocation.
90
        The returned lease type can either be on-demand or provisioned.
91
        Scheduling preference:
92
        1) Check for free provisioned concurrency => provisioned
93
        2) Check for reserved concurrency => on-demand
94
        3) Check for unreserved concurrency => on-demand
95

96
        HACK: We allow the function to be None for Lambda@Edge to skip provisioned and reserved concurrency.
97
        """
98
        account = function_version.id.account
1✔
99
        region = function_version.id.region
1✔
100
        scope_tuple = (account, region)
1✔
101
        on_demand_tracker = self.on_demand_concurrency_trackers.get(scope_tuple)
1✔
102
        # Double-checked locking pattern to initialize an on-demand concurrency tracker if it does not exist
103
        if not on_demand_tracker:
1✔
104
            with self.on_demand_init_lock:
1✔
105
                on_demand_tracker = self.on_demand_concurrency_trackers.get(scope_tuple)
1✔
106
                if not on_demand_tracker:
1✔
107
                    on_demand_tracker = self.on_demand_concurrency_trackers[scope_tuple] = (
1✔
108
                        ConcurrencyTracker()
109
                    )
110

111
        provisioned_tracker = self.provisioned_concurrency_trackers.get(scope_tuple)
1✔
112
        # Double-checked locking pattern to initialize a provisioned concurrency tracker if it does not exist
113
        if not provisioned_tracker:
1✔
114
            with self.provisioned_concurrency_init_lock:
1✔
115
                provisioned_tracker = self.provisioned_concurrency_trackers.get(scope_tuple)
1✔
116
                if not provisioned_tracker:
1✔
117
                    provisioned_tracker = self.provisioned_concurrency_trackers[scope_tuple] = (
1✔
118
                        ConcurrencyTracker()
119
                    )
120

121
        # TODO: check that we don't give a lease while updating provisioned concurrency
122
        # Potential challenge if an update happens in between reserving the lease here and actually assigning
123
        # * Increase provisioned: It could happen that we give a lease for provisioned-concurrency although
124
        # brand new provisioned environments are not yet initialized.
125
        # * Decrease provisioned: It could happen that we have running invocations that should still be counted
126
        # against the limit but they are not because we already updated the concurrency config to fewer envs.
127

128
        unqualified_function_arn = function_version.id.unqualified_arn()
1✔
129
        qualified_arn = function_version.id.qualified_arn()
1✔
130

131
        lease_type = None
1✔
132
        # HACK: skip reserved and provisioned concurrency if function not available (e.g., in Lambda@Edge)
133
        if function is not None:
1✔
134
            with provisioned_tracker.lock:
1✔
135
                # 1) Check for free provisioned concurrency
136
                provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
137
                    function_version.id.qualifier
138
                )
139
                if not provisioned_concurrency_config:
1✔
140
                    # check if any aliases point to the current version, and check the provisioned concurrency config
141
                    # for them. There can be only one config for a version, not matter if defined on the alias or version itself.
142
                    for alias in function.aliases.values():
1✔
143
                        if alias.function_version == function_version.id.qualifier:
1✔
144
                            provisioned_concurrency_config = (
1✔
145
                                function.provisioned_concurrency_configs.get(alias.name)
146
                            )
147
                            break
1✔
148
                if provisioned_concurrency_config:
1✔
149
                    available_provisioned_concurrency = (
1✔
150
                        provisioned_concurrency_config.provisioned_concurrent_executions
151
                        - provisioned_tracker.concurrent_executions[qualified_arn]
152
                    )
153
                    if available_provisioned_concurrency > 0:
1✔
154
                        provisioned_tracker.increment(qualified_arn)
1✔
155
                        lease_type = InitializationType.provisioned_concurrency
1✔
156

157
        if not lease_type:
1✔
158
            with on_demand_tracker.lock:
1✔
159
                # 2) If reserved concurrency is set AND no provisioned concurrency available:
160
                # => Check if enough reserved concurrency is available for the specific function.
161
                # HACK: skip reserved if function not available (e.g., in Lambda@Edge)
162
                if function and function.reserved_concurrent_executions is not None:
1✔
163
                    on_demand_running_invocation_count = on_demand_tracker.concurrent_executions[
1✔
164
                        unqualified_function_arn
165
                    ]
166
                    available_reserved_concurrency = (
1✔
167
                        function.reserved_concurrent_executions
168
                        - calculate_provisioned_concurrency_sum(function)
169
                        - on_demand_running_invocation_count
170
                    )
171
                    if available_reserved_concurrency > 0:
1✔
172
                        on_demand_tracker.increment(unqualified_function_arn)
1✔
173
                        lease_type = InitializationType.on_demand
1✔
174
                    else:
175
                        extras = {
1✔
176
                            "available_reserved_concurrency": available_reserved_concurrency,
177
                            "reserved_concurrent_executions": function.reserved_concurrent_executions,
178
                            "provisioned_concurrency_sum": calculate_provisioned_concurrency_sum(
179
                                function
180
                            ),
181
                            "on_demand_running_invocation_count": on_demand_running_invocation_count,
182
                        }
183
                        LOG.debug("Insufficient reserved concurrency available: %s", extras)
1✔
184
                        raise TooManyRequestsException(
1✔
185
                            "Rate Exceeded.",
186
                            Reason="ReservedFunctionConcurrentInvocationLimitExceeded",
187
                            Type="User",
188
                        )
189
                # 3) If no reserved concurrency is set AND no provisioned concurrency available.
190
                # => Check the entire state within the scope of account and region.
191
                else:
192
                    # TODO: Consider a dedicated counter for unavailable concurrency with locks for updates on
193
                    #  reserved and provisioned concurrency if this is too slow
194
                    # The total concurrency allocated or used (i.e., unavailable concurrency) per account and region
195
                    total_used_concurrency = 0
1✔
196
                    store = lambda_stores[account][region]
1✔
197
                    for fn in store.functions.values():
1✔
198
                        if fn.reserved_concurrent_executions is not None:
1✔
UNCOV
199
                            total_used_concurrency += fn.reserved_concurrent_executions
×
200
                        else:
201
                            fn_provisioned_concurrency = calculate_provisioned_concurrency_sum(fn)
1✔
202
                            total_used_concurrency += fn_provisioned_concurrency
1✔
203
                            fn_on_demand_concurrent_executions = (
1✔
204
                                on_demand_tracker.concurrent_executions[
205
                                    fn.latest().id.unqualified_arn()
206
                                ]
207
                            )
208
                            total_used_concurrency += fn_on_demand_concurrent_executions
1✔
209

210
                    available_unreserved_concurrency = (
1✔
211
                        config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS - total_used_concurrency
212
                    )
213
                    if available_unreserved_concurrency > 0:
1✔
214
                        on_demand_tracker.increment(unqualified_function_arn)
1✔
215
                        lease_type = InitializationType.on_demand
1✔
216
                    else:
217
                        if available_unreserved_concurrency < 0:
×
UNCOV
218
                            LOG.error(
×
219
                                "Invalid function concurrency state detected for function: %s | available unreserved concurrency: %d",
220
                                unqualified_function_arn,
221
                                available_unreserved_concurrency,
222
                            )
UNCOV
223
                        extras = {
×
224
                            "available_unreserved_concurrency": available_unreserved_concurrency,
225
                            "lambda_limits_concurrent_executions": config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS,
226
                            "total_used_concurrency": total_used_concurrency,
227
                        }
228
                        LOG.debug("Insufficient unreserved concurrency available: %s", extras)
×
UNCOV
229
                        raise TooManyRequestsException(
×
230
                            "Rate Exceeded.",
231
                            Reason="ReservedFunctionConcurrentInvocationLimitExceeded",
232
                            Type="User",
233
                        )
234
        try:
1✔
235
            yield lease_type
1✔
236
        finally:
237
            if lease_type == InitializationType.provisioned_concurrency:
1✔
238
                provisioned_tracker.atomic_decrement(qualified_arn)
1✔
239
            elif lease_type == InitializationType.on_demand:
1✔
240
                on_demand_tracker.atomic_decrement(unqualified_function_arn)
1✔
241
            else:
UNCOV
242
                LOG.error(
×
243
                    "Invalid lease type detected for function: %s: %s",
244
                    unqualified_function_arn,
245
                    lease_type,
246
                )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc