• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/localstack-core/localstack/services/lambda_/invocation/assignment.py
1
import contextlib
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from concurrent.futures import Future, ThreadPoolExecutor
1✔
5

6
from localstack.services.lambda_.invocation.execution_environment import (
1✔
7
    EnvironmentStartupTimeoutException,
8
    ExecutionEnvironment,
9
    InvalidStatusException,
10
)
11
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
12
from localstack.services.lambda_.invocation.lambda_models import (
1✔
13
    FunctionVersion,
14
    InitializationType,
15
    OtherServiceEndpoint,
16
)
17

18
LOG = logging.getLogger(__name__)
1✔
19

20

21
class AssignmentException(Exception):
1✔
22
    pass
1✔
23

24

25
class AssignmentService(OtherServiceEndpoint):
1✔
26
    """
27
    scope: LocalStack global
28
    """
29

30
    # function_version manager id => runtime_environment_id => runtime_environment
31
    environments: dict[str, dict[str, ExecutionEnvironment]]
1✔
32

33
    # Global pool for spawning and killing provisioned Lambda runtime environments
34
    provisioning_pool: ThreadPoolExecutor
1✔
35

36
    def __init__(self):
1✔
37
        self.environments = defaultdict(dict)
1✔
38
        self.provisioning_pool = ThreadPoolExecutor(thread_name_prefix="lambda-provisioning-pool")
1✔
39

40
    @contextlib.contextmanager
1✔
41
    def get_environment(
1✔
42
        self,
43
        version_manager_id: str,
44
        function_version: FunctionVersion,
45
        provisioning_type: InitializationType,
46
    ) -> contextlib.AbstractContextManager[ExecutionEnvironment]:
47
        applicable_envs = (
1✔
48
            env
49
            for env in self.environments[version_manager_id].values()
50
            if env.initialization_type == provisioning_type
51
        )
52
        execution_environment = None
1✔
53
        for environment in applicable_envs:
1✔
54
            try:
1✔
55
                environment.reserve()
1✔
56
                execution_environment = environment
1✔
57
                break
1✔
58
            except InvalidStatusException:
1✔
59
                pass
1✔
60

61
        if execution_environment is None:
1✔
62
            if provisioning_type == "provisioned-concurrency":
1✔
63
                raise AssignmentException(
×
64
                    "No provisioned concurrency environment available despite lease."
65
                )
66
            elif provisioning_type == "on-demand":
1✔
67
                execution_environment = self.start_environment(version_manager_id, function_version)
1✔
68
                self.environments[version_manager_id][execution_environment.id] = (
1✔
69
                    execution_environment
70
                )
71
                execution_environment.reserve()
1✔
72
            else:
73
                raise ValueError(f"Invalid provisioning type {provisioning_type}")
×
74

75
        try:
1✔
76
            yield execution_environment
1✔
77
            execution_environment.release()
1✔
78
        except InvalidStatusException as invalid_e:
1✔
79
            LOG.error("InvalidStatusException: %s", invalid_e)
×
80
        except Exception as e:
1✔
81
            LOG.error(
1✔
82
                "Failed invocation <%s>: %s", type(e), e, exc_info=LOG.isEnabledFor(logging.DEBUG)
83
            )
84
            self.stop_environment(execution_environment)
1✔
85
            raise e
1✔
86

87
    def start_environment(
1✔
88
        self, version_manager_id: str, function_version: FunctionVersion
89
    ) -> ExecutionEnvironment:
90
        LOG.debug("Starting new environment")
1✔
91
        initialization_type = "on-demand"
1✔
92
        if function_version.config.CapacityProviderConfig:
1✔
93
            initialization_type = "lambda-managed-instances"
×
94
        execution_environment = ExecutionEnvironment(
1✔
95
            function_version=function_version,
96
            initialization_type=initialization_type,
97
            on_timeout=self.on_timeout,
98
            version_manager_id=version_manager_id,
99
        )
100
        try:
1✔
101
            execution_environment.start()
1✔
102
        except StatusErrorException:
1✔
103
            raise
1✔
104
        except EnvironmentStartupTimeoutException:
1✔
105
            raise
1✔
106
        except Exception as e:
1✔
107
            message = f"Could not start new environment: {type(e).__name__}:{e}"
1✔
108
            raise AssignmentException(message) from e
1✔
109
        return execution_environment
1✔
110

111
    def on_timeout(self, version_manager_id: str, environment_id: str) -> None:
1✔
112
        """Callback for deleting environment after function times out"""
113
        del self.environments[version_manager_id][environment_id]
×
114

115
    def stop_environment(self, environment: ExecutionEnvironment) -> None:
1✔
116
        version_manager_id = environment.version_manager_id
1✔
117
        try:
1✔
118
            environment.stop()
1✔
119
            self.environments.get(version_manager_id).pop(environment.id)
1✔
120
        except Exception as e:
1✔
121
            LOG.debug(
1✔
122
                "Error while stopping environment for lambda %s, manager id %s, environment: %s, error: %s",
123
                environment.function_version.qualified_arn,
124
                version_manager_id,
125
                environment.id,
126
                e,
127
            )
128

129
    def stop_environments_for_version(self, version_manager_id: str):
1✔
130
        # We have to materialize the list before iterating due to concurrency
131
        environments_to_stop = list(self.environments.get(version_manager_id, {}).values())
1✔
132
        for env in environments_to_stop:
1✔
133
            self.stop_environment(env)
1✔
134

135
    def scale_provisioned_concurrency(
1✔
136
        self,
137
        version_manager_id: str,
138
        function_version: FunctionVersion,
139
        target_provisioned_environments: int,
140
    ) -> list[Future[None]]:
141
        current_provisioned_environments = [
1✔
142
            e
143
            for e in self.environments[version_manager_id].values()
144
            if e.initialization_type == "provisioned-concurrency"
145
        ]
146
        # TODO: refine scaling loop to re-use existing environments instead of re-creating all
147
        # current_provisioned_environments_count = len(current_provisioned_environments)
148
        # diff = target_provisioned_environments - current_provisioned_environments_count
149

150
        # TODO: handle case where no provisioned environment is available during scaling
151
        # Most simple scaling implementation for now:
152
        futures = []
1✔
153
        # 1) Re-create new target
154
        for _ in range(target_provisioned_environments):
1✔
155
            execution_environment = ExecutionEnvironment(
1✔
156
                function_version=function_version,
157
                initialization_type="provisioned-concurrency",
158
                on_timeout=self.on_timeout,
159
                version_manager_id=version_manager_id,
160
            )
161
            self.environments[version_manager_id][execution_environment.id] = execution_environment
1✔
162
            futures.append(self.provisioning_pool.submit(execution_environment.start))
1✔
163
        # 2) Kill all existing
164
        for env in current_provisioned_environments:
1✔
165
            # TODO: think about concurrent updates while deleting a function
166
            futures.append(self.provisioning_pool.submit(self.stop_environment, env))
1✔
167

168
        return futures
1✔
169

170
    def stop(self):
1✔
171
        self.provisioning_pool.shutdown(cancel_futures=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc