• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20357994067

18 Dec 2025 10:01PM UTC coverage: 86.913% (-0.02%) from 86.929%
20357994067

push

github

web-flow
Fix CloudWatch model annotation (#13545)

1 of 1 new or added line in 1 file covered. (100.0%)

1391 existing lines in 33 files now uncovered.

70000 of 80540 relevant lines covered (86.91%)

1.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/localstack-core/localstack/services/lambda_/invocation/assignment.py
1
import contextlib
2✔
2
import logging
2✔
3
from collections import defaultdict
2✔
4
from concurrent.futures import Future, ThreadPoolExecutor
2✔
5

6
from localstack.services.lambda_.invocation.execution_environment import (
2✔
7
    EnvironmentStartupTimeoutException,
8
    ExecutionEnvironment,
9
    InvalidStatusException,
10
)
11
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
2✔
12
from localstack.services.lambda_.invocation.lambda_models import (
2✔
13
    FunctionVersion,
14
    InitializationType,
15
    OtherServiceEndpoint,
16
)
17

18
LOG = logging.getLogger(__name__)
2✔
19

20

21
class AssignmentException(Exception):
2✔
22
    pass
2✔
23

24

25
class AssignmentService(OtherServiceEndpoint):
2✔
26
    """
27
    scope: LocalStack global
28
    """
29

30
    # function_version manager id => runtime_environment_id => runtime_environment
31
    environments: dict[str, dict[str, ExecutionEnvironment]]
2✔
32

33
    # Global pool for spawning and killing provisioned Lambda runtime environments
34
    provisioning_pool: ThreadPoolExecutor
2✔
35

36
    def __init__(self):
2✔
37
        self.environments = defaultdict(dict)
2✔
38
        self.provisioning_pool = ThreadPoolExecutor(thread_name_prefix="lambda-provisioning-pool")
2✔
39

40
    @contextlib.contextmanager
2✔
41
    def get_environment(
2✔
42
        self,
43
        version_manager_id: str,
44
        function_version: FunctionVersion,
45
        provisioning_type: InitializationType,
46
    ) -> contextlib.AbstractContextManager[ExecutionEnvironment]:
47
        applicable_envs = (
2✔
48
            env
49
            for env in self.environments[version_manager_id].values()
50
            if env.initialization_type == provisioning_type
51
        )
52
        execution_environment = None
2✔
53
        for environment in applicable_envs:
2✔
54
            try:
2✔
55
                environment.reserve()
2✔
56
                execution_environment = environment
2✔
57
                break
2✔
58
            except InvalidStatusException:
2✔
59
                pass
2✔
60

61
        if execution_environment is None:
2✔
62
            if provisioning_type == "provisioned-concurrency":
2✔
63
                raise AssignmentException(
×
64
                    "No provisioned concurrency environment available despite lease."
65
                )
66
            elif provisioning_type == "on-demand":
2✔
67
                execution_environment = self.start_environment(version_manager_id, function_version)
2✔
68
                self.environments[version_manager_id][execution_environment.id] = (
2✔
69
                    execution_environment
70
                )
71
                execution_environment.reserve()
2✔
72
            else:
73
                raise ValueError(f"Invalid provisioning type {provisioning_type}")
×
74

75
        try:
2✔
76
            yield execution_environment
2✔
77
            execution_environment.release()
2✔
78
        except InvalidStatusException as invalid_e:
2✔
UNCOV
79
            LOG.error("InvalidStatusException: %s", invalid_e)
×
80
        except Exception as e:
2✔
81
            LOG.error(
2✔
82
                "Failed invocation <%s>: %s", type(e), e, exc_info=LOG.isEnabledFor(logging.DEBUG)
83
            )
84
            self.stop_environment(execution_environment)
2✔
85
            raise e
2✔
86

87
    def start_environment(
2✔
88
        self, version_manager_id: str, function_version: FunctionVersion
89
    ) -> ExecutionEnvironment:
90
        LOG.debug("Starting new environment")
2✔
91
        initialization_type = "on-demand"
2✔
92
        if function_version.config.CapacityProviderConfig:
2✔
93
            initialization_type = "lambda-managed-instances"
×
94
        execution_environment = ExecutionEnvironment(
2✔
95
            function_version=function_version,
96
            initialization_type=initialization_type,
97
            on_timeout=self.on_timeout,
98
            version_manager_id=version_manager_id,
99
        )
100
        try:
2✔
101
            execution_environment.start()
2✔
102
        except StatusErrorException:
2✔
103
            raise
2✔
104
        except EnvironmentStartupTimeoutException:
2✔
105
            raise
2✔
106
        except Exception as e:
2✔
107
            message = f"Could not start new environment: {type(e).__name__}:{e}"
2✔
108
            raise AssignmentException(message) from e
2✔
109
        return execution_environment
2✔
110

111
    def on_timeout(self, version_manager_id: str, environment_id: str) -> None:
2✔
112
        """Callback for deleting environment after function times out"""
113
        del self.environments[version_manager_id][environment_id]
×
114

115
    def stop_environment(self, environment: ExecutionEnvironment) -> None:
2✔
116
        version_manager_id = environment.version_manager_id
2✔
117
        try:
2✔
118
            environment.stop()
2✔
119
            self.environments.get(version_manager_id).pop(environment.id)
2✔
120
        except Exception as e:
2✔
121
            LOG.debug(
2✔
122
                "Error while stopping environment for lambda %s, manager id %s, environment: %s, error: %s",
123
                environment.function_version.qualified_arn,
124
                version_manager_id,
125
                environment.id,
126
                e,
127
            )
128

129
    def stop_environments_for_version(self, version_manager_id: str):
2✔
130
        # We have to materialize the list before iterating due to concurrency
131
        environments_to_stop = list(self.environments.get(version_manager_id, {}).values())
2✔
132
        for env in environments_to_stop:
2✔
133
            self.stop_environment(env)
2✔
134

135
    def scale_provisioned_concurrency(
2✔
136
        self,
137
        version_manager_id: str,
138
        function_version: FunctionVersion,
139
        target_provisioned_environments: int,
140
    ) -> list[Future[None]]:
141
        current_provisioned_environments = [
2✔
142
            e
143
            for e in self.environments[version_manager_id].values()
144
            if e.initialization_type == "provisioned-concurrency"
145
        ]
146
        # TODO: refine scaling loop to re-use existing environments instead of re-creating all
147
        # current_provisioned_environments_count = len(current_provisioned_environments)
148
        # diff = target_provisioned_environments - current_provisioned_environments_count
149

150
        # TODO: handle case where no provisioned environment is available during scaling
151
        # Most simple scaling implementation for now:
152
        futures = []
2✔
153
        # 1) Re-create new target
154
        for _ in range(target_provisioned_environments):
2✔
155
            execution_environment = ExecutionEnvironment(
2✔
156
                function_version=function_version,
157
                initialization_type="provisioned-concurrency",
158
                on_timeout=self.on_timeout,
159
                version_manager_id=version_manager_id,
160
            )
161
            self.environments[version_manager_id][execution_environment.id] = execution_environment
2✔
162
            futures.append(self.provisioning_pool.submit(execution_environment.start))
2✔
163
        # 2) Kill all existing
164
        for env in current_provisioned_environments:
2✔
165
            # TODO: think about concurrent updates while deleting a function
166
            futures.append(self.provisioning_pool.submit(self.stop_environment, env))
2✔
167

168
        return futures
2✔
169

170
    def stop(self):
2✔
171
        self.provisioning_pool.shutdown(cancel_futures=True)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc