• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21708052316

05 Feb 2026 10:30AM UTC coverage: 86.873% (-0.09%) from 86.962%
21708052316

push

github

web-flow
Deprecate TaggingService (#13697)

2 of 2 new or added lines in 1 file covered. (100.0%)

158 existing lines in 15 files now uncovered.

69932 of 80499 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.19
/localstack-core/localstack/services/lambda_/invocation/assignment.py
1
import contextlib
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from collections.abc import Iterator
1✔
5
from concurrent.futures import Future, ThreadPoolExecutor
1✔
6

7
from localstack.services.lambda_.invocation.execution_environment import (
1✔
8
    EnvironmentStartupTimeoutException,
9
    ExecutionEnvironment,
10
    InvalidStatusException,
11
)
12
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
13
from localstack.services.lambda_.invocation.lambda_models import (
1✔
14
    FunctionVersion,
15
    InitializationType,
16
    OtherServiceEndpoint,
17
)
18

19
LOG = logging.getLogger(__name__)
1✔
20

21

22
class AssignmentException(Exception):
1✔
23
    pass
1✔
24

25

26
class AssignmentService(OtherServiceEndpoint):
1✔
27
    """
28
    scope: LocalStack global
29
    """
30

31
    # function_version manager id => runtime_environment_id => runtime_environment
32
    environments: dict[str, dict[str, ExecutionEnvironment]]
1✔
33

34
    # Global pool for spawning and killing provisioned Lambda runtime environments
35
    provisioning_pool: ThreadPoolExecutor
1✔
36

37
    def __init__(self):
1✔
38
        self.environments = defaultdict(dict)
1✔
39
        self.provisioning_pool = ThreadPoolExecutor(thread_name_prefix="lambda-provisioning-pool")
1✔
40

41
    @contextlib.contextmanager
1✔
42
    def get_environment(
1✔
43
        self,
44
        version_manager_id: str,
45
        function_version: FunctionVersion,
46
        provisioning_type: InitializationType,
47
    ) -> Iterator[ExecutionEnvironment]:
48
        applicable_envs = (
1✔
49
            env
50
            for env in self.environments[version_manager_id].values()
51
            if env.initialization_type == provisioning_type
52
        )
53
        execution_environment = None
1✔
54
        for environment in applicable_envs:
1✔
55
            try:
1✔
56
                environment.reserve()
1✔
57
                execution_environment = environment
1✔
58
                break
1✔
59
            except InvalidStatusException:
1✔
60
                pass
1✔
61

62
        if execution_environment is None:
1✔
63
            if provisioning_type == InitializationType.provisioned_concurrency:
1✔
64
                raise AssignmentException(
×
65
                    "No provisioned concurrency environment available despite lease."
66
                )
67
            elif provisioning_type == InitializationType.on_demand:
1✔
68
                execution_environment = self.start_environment(version_manager_id, function_version)
1✔
69
                self.environments[version_manager_id][execution_environment.id] = (
1✔
70
                    execution_environment
71
                )
72
                execution_environment.reserve()
1✔
73
            else:
74
                raise ValueError(f"Invalid provisioning type {provisioning_type}")
×
75

76
        try:
1✔
77
            yield execution_environment
1✔
78
            execution_environment.release()
1✔
79
        except InvalidStatusException as invalid_e:
1✔
UNCOV
80
            LOG.error("InvalidStatusException: %s", invalid_e)
×
81
        except Exception as e:
1✔
82
            LOG.error(
1✔
83
                "Failed invocation <%s>: %s", type(e), e, exc_info=LOG.isEnabledFor(logging.DEBUG)
84
            )
85
            self.stop_environment(execution_environment)
1✔
86
            raise e
1✔
87

88
    def start_environment(
1✔
89
        self, version_manager_id: str, function_version: FunctionVersion
90
    ) -> ExecutionEnvironment:
91
        LOG.debug("Starting new environment")
1✔
92
        initialization_type = InitializationType.on_demand
1✔
93
        if function_version.config.CapacityProviderConfig:
1✔
94
            initialization_type = InitializationType.lambda_managed_instances
×
95
        execution_environment = ExecutionEnvironment(
1✔
96
            function_version=function_version,
97
            initialization_type=initialization_type,
98
            on_timeout=self.on_timeout,
99
            version_manager_id=version_manager_id,
100
        )
101
        try:
1✔
102
            execution_environment.start()
1✔
103
        except StatusErrorException:
1✔
104
            raise
1✔
105
        except EnvironmentStartupTimeoutException:
1✔
106
            raise
1✔
107
        except Exception as e:
1✔
108
            message = f"Could not start new environment: {type(e).__name__}:{e}"
1✔
109
            raise AssignmentException(message) from e
1✔
110
        return execution_environment
1✔
111

112
    def on_timeout(self, version_manager_id: str, environment_id: str) -> None:
1✔
113
        """Callback for deleting environment after function times out"""
114
        del self.environments[version_manager_id][environment_id]
×
115

116
    def stop_environment(self, environment: ExecutionEnvironment) -> None:
1✔
117
        version_manager_id = environment.version_manager_id
1✔
118
        try:
1✔
119
            environment.stop()
1✔
120
            self.environments.get(version_manager_id).pop(environment.id)
1✔
121
        except Exception as e:
1✔
122
            LOG.debug(
1✔
123
                "Error while stopping environment for lambda %s, manager id %s, environment: %s, error: %s",
124
                environment.function_version.qualified_arn,
125
                version_manager_id,
126
                environment.id,
127
                e,
128
            )
129

130
    def stop_environments_for_version(self, version_manager_id: str):
1✔
131
        # We have to materialize the list before iterating due to concurrency
132
        environments_to_stop = list(self.environments.get(version_manager_id, {}).values())
1✔
133
        for env in environments_to_stop:
1✔
134
            self.stop_environment(env)
1✔
135

136
    def scale_provisioned_concurrency(
1✔
137
        self,
138
        version_manager_id: str,
139
        function_version: FunctionVersion,
140
        target_provisioned_environments: int,
141
    ) -> list[Future[None]]:
142
        current_provisioned_environments = [
1✔
143
            e
144
            for e in self.environments[version_manager_id].values()
145
            if e.initialization_type == InitializationType.provisioned_concurrency
146
        ]
147
        # TODO: refine scaling loop to re-use existing environments instead of re-creating all
148
        # current_provisioned_environments_count = len(current_provisioned_environments)
149
        # diff = target_provisioned_environments - current_provisioned_environments_count
150

151
        # TODO: handle case where no provisioned environment is available during scaling
152
        # Most simple scaling implementation for now:
153
        futures = []
1✔
154
        # 1) Re-create new target
155
        for _ in range(target_provisioned_environments):
1✔
156
            execution_environment = ExecutionEnvironment(
1✔
157
                function_version=function_version,
158
                initialization_type=InitializationType.provisioned_concurrency,
159
                on_timeout=self.on_timeout,
160
                version_manager_id=version_manager_id,
161
            )
162
            self.environments[version_manager_id][execution_environment.id] = execution_environment
1✔
163
            futures.append(self.provisioning_pool.submit(execution_environment.start))
1✔
164
        # 2) Kill all existing
165
        for env in current_provisioned_environments:
1✔
166
            # TODO: think about concurrent updates while deleting a function
167
            futures.append(self.provisioning_pool.submit(self.stop_environment, env))
1✔
168

169
        return futures
1✔
170

171
    def stop(self):
1✔
172
        self.provisioning_pool.shutdown(cancel_futures=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc