• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22836606749

06 Mar 2026 12:12PM UTC coverage: 86.961% (+0.02%) from 86.937%
22836606749

push

github

web-flow
Chore: More typing for utils/Files and utils/DockerUtils (#13677)

42 of 48 new or added lines in 5 files covered. (87.5%)

74 existing lines in 6 files now uncovered.

69889 of 80368 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/localstack-core/localstack/services/lambda_/invocation/assignment.py
1
import contextlib
1✔
2
import logging
1✔
3
import threading
1✔
4
from collections import defaultdict
1✔
5
from collections.abc import Iterator
1✔
6
from concurrent.futures import Future, ThreadPoolExecutor
1✔
7

8
from localstack.services.lambda_.invocation.execution_environment import (
1✔
9
    EnvironmentStartupTimeoutException,
10
    ExecutionEnvironment,
11
    InvalidStatusException,
12
)
13
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
14
from localstack.services.lambda_.invocation.lambda_models import (
1✔
15
    FunctionVersion,
16
    InitializationType,
17
    OtherServiceEndpoint,
18
)
19

20
LOG = logging.getLogger(__name__)
1✔
21

22

23
class AssignmentException(Exception):
1✔
24
    pass
1✔
25

26

27
class AssignmentService(OtherServiceEndpoint):
1✔
28
    """
29
    scope: LocalStack global
30
    """
31

32
    # function_version manager id => runtime_environment_id => runtime_environment
33
    environments: dict[str, dict[str, ExecutionEnvironment]]
1✔
34

35
    # Global pool for spawning and killing provisioned Lambda runtime environments
36
    provisioning_pool: ThreadPoolExecutor
1✔
37

38
    # Semaphore limiting the number of on-demand containers starting simultaneously.
39
    # Concurrent container starts are I/O-heavy (Docker API calls, copying runtime files)
40
    # and can exhaust OS file descriptor limits on machines with low ulimits.
41
    on_demand_start_semaphore: threading.Semaphore
1✔
42

43
    def __init__(self):
1✔
44
        self.environments = defaultdict(dict)
1✔
45
        self.provisioning_pool = ThreadPoolExecutor(thread_name_prefix="lambda-provisioning-pool")
1✔
46
        # TODO: make this value configurable; 16 is a conservative default
47
        self.on_demand_start_semaphore = threading.Semaphore(16)
1✔
48

49
    @contextlib.contextmanager
1✔
50
    def get_environment(
1✔
51
        self,
52
        version_manager_id: str,
53
        function_version: FunctionVersion,
54
        provisioning_type: InitializationType,
55
    ) -> Iterator[ExecutionEnvironment]:
56
        # Snapshot the values list before iterating to avoid skipped entries
57
        # that can be caused by concurrent invocations
58
        applicable_envs = [
1✔
59
            env
60
            for env in list(self.environments[version_manager_id].values())
61
            if env.initialization_type == provisioning_type
62
        ]
63
        execution_environment = None
1✔
64
        for environment in applicable_envs:
1✔
65
            try:
1✔
66
                environment.reserve()
1✔
67
                execution_environment = environment
1✔
68
                break
1✔
69
            except InvalidStatusException:
1✔
70
                pass
1✔
71

72
        if execution_environment is None:
1✔
73
            if provisioning_type == InitializationType.provisioned_concurrency:
1✔
74
                raise AssignmentException(
×
75
                    "No provisioned concurrency environment available despite lease."
76
                )
77
            elif provisioning_type == InitializationType.on_demand:
1✔
78
                with self.on_demand_start_semaphore:
1✔
79
                    execution_environment = self.start_environment(
1✔
80
                        version_manager_id, function_version
81
                    )
82
                self.environments[version_manager_id][execution_environment.id] = (
1✔
83
                    execution_environment
84
                )
85
                execution_environment.reserve()
1✔
86
            else:
87
                raise ValueError(f"Invalid provisioning type {provisioning_type}")
×
88

89
        try:
1✔
90
            yield execution_environment
1✔
91
            execution_environment.release()
1✔
92
        except InvalidStatusException as invalid_e:
1✔
UNCOV
93
            LOG.error("InvalidStatusException: %s", invalid_e)
×
94
        except Exception as e:
1✔
95
            LOG.error(
1✔
96
                "Failed invocation <%s>: %s", type(e), e, exc_info=LOG.isEnabledFor(logging.DEBUG)
97
            )
98
            if execution_environment.initialization_type == InitializationType.on_demand:
1✔
99
                self.stop_environment(execution_environment)
1✔
100
            else:
101
                # Try to restore to READY rather than stopping.
102
                # Transient errors (e.g., OS-level connection failures) should not
103
                # permanently remove healthy provisioned containers from the pool.
104
                try:
×
105
                    execution_environment.release()
×
106
                except InvalidStatusException:
×
107
                    self.stop_environment(execution_environment)
×
108
            raise e
1✔
109

110
    def start_environment(
1✔
111
        self, version_manager_id: str, function_version: FunctionVersion
112
    ) -> ExecutionEnvironment:
113
        LOG.debug("Starting new environment")
1✔
114
        initialization_type = InitializationType.on_demand
1✔
115
        if function_version.config.capacity_provider_config:
1✔
116
            initialization_type = InitializationType.lambda_managed_instances
×
117
        execution_environment = ExecutionEnvironment(
1✔
118
            function_version=function_version,
119
            initialization_type=initialization_type,
120
            on_timeout=self.on_timeout,
121
            version_manager_id=version_manager_id,
122
        )
123
        try:
1✔
124
            execution_environment.start()
1✔
125
        except StatusErrorException:
1✔
126
            raise
1✔
127
        except EnvironmentStartupTimeoutException:
1✔
128
            raise
1✔
129
        except Exception as e:
1✔
130
            message = f"Could not start new environment: {type(e).__name__}:{e}"
1✔
131
            raise AssignmentException(message) from e
1✔
132
        return execution_environment
1✔
133

134
    def on_timeout(self, version_manager_id: str, environment_id: str) -> None:
1✔
135
        """Callback for deleting environment after function times out"""
136
        del self.environments[version_manager_id][environment_id]
×
137

138
    def stop_environment(self, environment: ExecutionEnvironment) -> None:
1✔
139
        version_manager_id = environment.version_manager_id
1✔
140
        try:
1✔
141
            environment.stop()
1✔
142
            self.environments.get(version_manager_id).pop(environment.id)
1✔
143
        except Exception as e:
1✔
144
            LOG.debug(
1✔
145
                "Error while stopping environment for lambda %s, manager id %s, environment: %s, error: %s",
146
                environment.function_version.qualified_arn,
147
                version_manager_id,
148
                environment.id,
149
                e,
150
            )
151

152
    def stop_environments_for_version(self, version_manager_id: str):
1✔
153
        # We have to materialize the list before iterating due to concurrency
154
        environments_to_stop = list(self.environments.get(version_manager_id, {}).values())
1✔
155
        for env in environments_to_stop:
1✔
156
            self.stop_environment(env)
1✔
157

158
    def scale_provisioned_concurrency(
1✔
159
        self,
160
        version_manager_id: str,
161
        function_version: FunctionVersion,
162
        target_provisioned_environments: int,
163
    ) -> list[Future[None]]:
164
        current_provisioned_environments = [
1✔
165
            e
166
            for e in self.environments[version_manager_id].values()
167
            if e.initialization_type == InitializationType.provisioned_concurrency
168
        ]
169
        # TODO: refine scaling loop to re-use existing environments instead of re-creating all
170
        # current_provisioned_environments_count = len(current_provisioned_environments)
171
        # diff = target_provisioned_environments - current_provisioned_environments_count
172

173
        # TODO: handle case where no provisioned environment is available during scaling. Does AWS serve on-demand?
174
        # Most simple scaling implementation for now:
175
        futures = []
1✔
176
        # 1) Re-create new target
177
        for _ in range(target_provisioned_environments):
1✔
178
            execution_environment = ExecutionEnvironment(
1✔
179
                function_version=function_version,
180
                initialization_type=InitializationType.provisioned_concurrency,
181
                on_timeout=self.on_timeout,
182
                version_manager_id=version_manager_id,
183
            )
184
            self.environments[version_manager_id][execution_environment.id] = execution_environment
1✔
185
            futures.append(self.provisioning_pool.submit(execution_environment.start))
1✔
186
        # 2) Kill all existing
187
        for env in current_provisioned_environments:
1✔
188
            # TODO: think about concurrent updates while deleting a function
189
            futures.append(self.provisioning_pool.submit(self.stop_environment, env))
1✔
190

191
        return futures
1✔
192

193
    def stop(self):
1✔
194
        self.provisioning_pool.shutdown(cancel_futures=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc