• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 836de691-8bbb-4693-9ed3-7ab4d196cd89

20 Mar 2025 04:14PM UTC coverage: 86.818% (+0.009%) from 86.809%
836de691-8bbb-4693-9ed3-7ab4d196cd89

push

circleci

web-flow
S3: fix MA/MR test (#12417)

62787 of 72320 relevant lines covered (86.82%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/localstack-core/localstack/services/lambda_/invocation/assignment.py
1
import contextlib
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from concurrent.futures import Future, ThreadPoolExecutor
1✔
5
from typing import ContextManager
1✔
6

7
from localstack.services.lambda_.invocation.execution_environment import (
1✔
8
    EnvironmentStartupTimeoutException,
9
    ExecutionEnvironment,
10
    InvalidStatusException,
11
)
12
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
13
from localstack.services.lambda_.invocation.lambda_models import (
1✔
14
    FunctionVersion,
15
    InitializationType,
16
    OtherServiceEndpoint,
17
)
18
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
1✔
19
    is_lambda_debug_enabled_for,
20
    is_lambda_debug_timeout_enabled_for,
21
)
22

23
LOG = logging.getLogger(__name__)
1✔
24

25

26
class AssignmentException(Exception):
1✔
27
    pass
1✔
28

29

30
class AssignmentService(OtherServiceEndpoint):
1✔
31
    """
32
    scope: LocalStack global
33
    """
34

35
    # function_version manager id => runtime_environment_id => runtime_environment
36
    environments: dict[str, dict[str, ExecutionEnvironment]]
1✔
37

38
    # Global pool for spawning and killing provisioned Lambda runtime environments
39
    provisioning_pool: ThreadPoolExecutor
1✔
40

41
    def __init__(self):
1✔
42
        self.environments = defaultdict(dict)
1✔
43
        self.provisioning_pool = ThreadPoolExecutor(thread_name_prefix="lambda-provisioning-pool")
1✔
44

45
    @contextlib.contextmanager
1✔
46
    def get_environment(
1✔
47
        self,
48
        version_manager_id: str,
49
        function_version: FunctionVersion,
50
        provisioning_type: InitializationType,
51
    ) -> ContextManager[ExecutionEnvironment]:
52
        applicable_envs = (
1✔
53
            env
54
            for env in self.environments[version_manager_id].values()
55
            if env.initialization_type == provisioning_type
56
        )
57
        execution_environment = None
1✔
58
        for environment in applicable_envs:
1✔
59
            try:
1✔
60
                environment.reserve()
1✔
61
                execution_environment = environment
1✔
62
                break
1✔
63
            except InvalidStatusException:
×
64
                pass
×
65

66
        if execution_environment is None:
1✔
67
            if provisioning_type == "provisioned-concurrency":
1✔
68
                raise AssignmentException(
×
69
                    "No provisioned concurrency environment available despite lease."
70
                )
71
            elif provisioning_type == "on-demand":
1✔
72
                execution_environment = self.start_environment(version_manager_id, function_version)
1✔
73
                self.environments[version_manager_id][execution_environment.id] = (
1✔
74
                    execution_environment
75
                )
76
                execution_environment.reserve()
1✔
77
            else:
78
                raise ValueError(f"Invalid provisioning type {provisioning_type}")
×
79

80
        try:
1✔
81
            yield execution_environment
1✔
82
            if is_lambda_debug_timeout_enabled_for(lambda_arn=function_version.qualified_arn):
1✔
83
                self.stop_environment(execution_environment)
×
84
            else:
85
                execution_environment.release()
1✔
86
        except InvalidStatusException as invalid_e:
1✔
87
            LOG.error("InvalidStatusException: %s", invalid_e)
×
88
        except Exception as e:
1✔
89
            LOG.error("Failed invocation %s", e)
1✔
90
            self.stop_environment(execution_environment)
1✔
91
            raise e
1✔
92

93
    def start_environment(
1✔
94
        self, version_manager_id: str, function_version: FunctionVersion
95
    ) -> ExecutionEnvironment:
96
        LOG.debug("Starting new environment")
1✔
97
        execution_environment = ExecutionEnvironment(
1✔
98
            function_version=function_version,
99
            initialization_type="on-demand",
100
            on_timeout=self.on_timeout,
101
            version_manager_id=version_manager_id,
102
        )
103
        try:
1✔
104
            execution_environment.start()
1✔
105
        except StatusErrorException:
1✔
106
            raise
1✔
107
        except EnvironmentStartupTimeoutException:
1✔
108
            raise
1✔
109
        except Exception as e:
1✔
110
            message = f"Could not start new environment: {e}"
1✔
111
            raise AssignmentException(message) from e
1✔
112
        return execution_environment
1✔
113

114
    def on_timeout(self, version_manager_id: str, environment_id: str) -> None:
1✔
115
        """Callback for deleting environment after function times out"""
116
        del self.environments[version_manager_id][environment_id]
×
117

118
    def stop_environment(self, environment: ExecutionEnvironment) -> None:
1✔
119
        version_manager_id = environment.version_manager_id
1✔
120
        try:
1✔
121
            environment.stop()
1✔
122
            self.environments.get(version_manager_id).pop(environment.id)
1✔
123
        except Exception as e:
1✔
124
            LOG.debug(
1✔
125
                "Error while stopping environment for lambda %s, manager id %s, environment: %s, error: %s",
126
                environment.function_version.qualified_arn,
127
                version_manager_id,
128
                environment.id,
129
                e,
130
            )
131

132
    def stop_environments_for_version(self, version_manager_id: str):
1✔
133
        # We have to materialize the list before iterating due to concurrency
134
        environments_to_stop = list(self.environments.get(version_manager_id, {}).values())
1✔
135
        for env in environments_to_stop:
1✔
136
            self.stop_environment(env)
1✔
137

138
    def scale_provisioned_concurrency(
1✔
139
        self,
140
        version_manager_id: str,
141
        function_version: FunctionVersion,
142
        target_provisioned_environments: int,
143
    ) -> list[Future[None]]:
144
        # Enforce a single environment per lambda version if this is a target
145
        # of an active Lambda Debug Mode.
146
        qualified_lambda_version_arn = function_version.qualified_arn
1✔
147
        if (
1✔
148
            is_lambda_debug_enabled_for(qualified_lambda_version_arn)
149
            and target_provisioned_environments > 0
150
        ):
151
            LOG.warning(
×
152
                "Environments for '%s' enforced to '1' by Lambda Debug Mode, "
153
                "configurations will continue to report the set value '%s'",
154
                qualified_lambda_version_arn,
155
                target_provisioned_environments,
156
            )
157
            target_provisioned_environments = 1
×
158

159
        current_provisioned_environments = [
1✔
160
            e
161
            for e in self.environments[version_manager_id].values()
162
            if e.initialization_type == "provisioned-concurrency"
163
        ]
164
        # TODO: refine scaling loop to re-use existing environments instead of re-creating all
165
        # current_provisioned_environments_count = len(current_provisioned_environments)
166
        # diff = target_provisioned_environments - current_provisioned_environments_count
167

168
        # TODO: handle case where no provisioned environment is available during scaling
169
        # Most simple scaling implementation for now:
170
        futures = []
1✔
171
        # 1) Re-create new target
172
        for _ in range(target_provisioned_environments):
1✔
173
            execution_environment = ExecutionEnvironment(
1✔
174
                function_version=function_version,
175
                initialization_type="provisioned-concurrency",
176
                on_timeout=self.on_timeout,
177
                version_manager_id=version_manager_id,
178
            )
179
            self.environments[version_manager_id][execution_environment.id] = execution_environment
1✔
180
            futures.append(self.provisioning_pool.submit(execution_environment.start))
1✔
181
        # 2) Kill all existing
182
        for env in current_provisioned_environments:
1✔
183
            # TODO: think about concurrent updates while deleting a function
184
            futures.append(self.provisioning_pool.submit(self.stop_environment, env))
1✔
185

186
        return futures
1✔
187

188
    def stop(self):
1✔
189
        self.provisioning_pool.shutdown(cancel_futures=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc