• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22700205643

04 Mar 2026 04:24PM UTC coverage: 86.938% (-0.01%) from 86.951%
22700205643

push

github

web-flow
Lambda: fix attribute exceptions (#13863)

3 of 4 new or added lines in 1 file covered. (75.0%)

78 existing lines in 5 files now uncovered.

69850 of 80345 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.33
/localstack-core/localstack/services/plugins.py
1
import abc
1✔
2
import functools
1✔
3
import logging
1✔
4
import threading
1✔
5
from collections import defaultdict
1✔
6
from collections.abc import Callable
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from enum import Enum
1✔
9
from typing import Protocol
1✔
10

11
from plux import Plugin, PluginLifecycleListener, PluginManager, PluginSpec
1✔
12

13
from localstack import config
1✔
14
from localstack.aws.skeleton import DispatchTable, Skeleton
1✔
15
from localstack.aws.spec import load_service
1✔
16
from localstack.config import ServiceProviderConfig
1✔
17
from localstack.runtime import hooks
1✔
18
from localstack.state import StateLifecycleHook, StateVisitable, StateVisitor
1✔
19
from localstack.utils.bootstrap import get_enabled_apis, is_api_enabled, log_duration
1✔
20
from localstack.utils.functions import call_safe
1✔
21
from localstack.utils.sync import SynchronizedDefaultDict, poll_condition
1✔
22

23
# set up logger
24
LOG = logging.getLogger(__name__)
1✔
25

26
# namespace for AWS provider plugins
27
PLUGIN_NAMESPACE = "localstack.aws.provider"
1✔
28

29
_default = object()  # sentinel object indicating a default value
1✔
30

31

32
# -----------------
33
# PLUGIN UTILITIES
34
# -----------------
35

36

37
class ServiceException(Exception):
1✔
38
    pass
1✔
39

40

41
class ServiceDisabled(ServiceException):
1✔
42
    pass
1✔
43

44

45
class ServiceStateException(ServiceException):
1✔
46
    pass
1✔
47

48

49
class ServiceLifecycleHook(StateLifecycleHook):
1✔
50
    def on_after_init(self):
1✔
51
        pass
1✔
52

53
    def on_before_start(self):
1✔
54
        pass
1✔
55

56
    def on_before_stop(self):
1✔
57
        pass
1✔
58

59
    def on_exception(self):
1✔
60
        pass
×
61

62

63
class ServiceProvider(Protocol):
1✔
64
    service: str
1✔
65

66

67
class Service:
1✔
68
    """
69
    FIXME: this has become frankenstein's monster, and it has to go. once we've rid ourselves of the legacy edge
70
     proxy, we can get rid of the ``listener`` concept. we should then do one iteration over all the
71
     ``start_dynamodb``, ``start_<whatever>``, ``check_<whatever>``, etc. methods, to make all of those integral part
72
     of the service provider. the assumption that every service provider starts a backend server is outdated, and then
73
     we can get rid of ``start``, and ``check``.
74
    """
75

76
    def __init__(
1✔
77
        self,
78
        name,
79
        start=_default,
80
        check=None,
81
        skeleton=None,
82
        active=False,
83
        stop=None,
84
        lifecycle_hook: ServiceLifecycleHook = None,
85
    ):
86
        self.plugin_name = name
1✔
87
        self.start_function = start
1✔
88
        self.skeleton = skeleton
1✔
89
        self.check_function = check
1✔
90
        self.default_active = active
1✔
91
        self.stop_function = stop
1✔
92
        self.lifecycle_hook = lifecycle_hook or ServiceLifecycleHook()
1✔
93
        self._provider = None
1✔
94
        call_safe(self.lifecycle_hook.on_after_init)
1✔
95

96
    def start(self, asynchronous):
1✔
97
        call_safe(self.lifecycle_hook.on_before_start)
1✔
98

99
        if not self.start_function:
1✔
100
            return
×
101

102
        if self.start_function is _default:
1✔
103
            return
1✔
104

105
        kwargs = {"asynchronous": asynchronous}
×
106
        if self.skeleton:
×
107
            kwargs["update_listener"] = self.skeleton
×
108
        return self.start_function(**kwargs)
×
109

110
    def stop(self):
1✔
111
        call_safe(self.lifecycle_hook.on_before_stop)
1✔
112
        if not self.stop_function:
1✔
113
            return
1✔
114
        return self.stop_function()
×
115

116
    def check(self, expect_shutdown=False, print_error=False):
1✔
117
        if not self.check_function:
1✔
118
            return
1✔
119
        return self.check_function(expect_shutdown=expect_shutdown, print_error=print_error)
×
120

121
    def name(self):
1✔
122
        return self.plugin_name
1✔
123

124
    def is_enabled(self):
1✔
125
        return is_api_enabled(self.plugin_name)
1✔
126

127
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
128
        """
129
        Passes the StateVisitor to the ASF provider if it is set and implements the StateVisitable. Otherwise, it uses
130
        the ReflectionStateLocator to visit the service state.
131

132
        :param visitor: the visitor
133
        """
134
        if self._provider and isinstance(self._provider, StateVisitable):
×
135
            self._provider.accept_state_visitor(visitor)
×
136
            return
×
137

138
        from localstack.state.inspect import ReflectionStateLocator
×
139

140
        ReflectionStateLocator(service=self.name()).accept_state_visitor(visitor)
×
141

142
    @staticmethod
1✔
143
    def for_provider(
1✔
144
        provider: ServiceProvider,
145
        dispatch_table_factory: Callable[[ServiceProvider], DispatchTable] = None,
146
        service_lifecycle_hook: ServiceLifecycleHook = None,
147
    ) -> "Service":
148
        """
149
        Factory method for creating services for providers. This method hides a bunch of legacy code and
150
        band-aids/adapters to make persistence visitors work, while providing compatibility with the legacy edge proxy.
151

152
        :param provider: the service provider, i.e., the implementation of the generated ASF service API.
153
        :param dispatch_table_factory: a `MotoFallbackDispatcher` or something similar that uses the provider to
154
            create a dispatch table. this one's a bit clumsy.
155
        :param service_lifecycle_hook: if left empty, the factory checks whether the provider is a ServiceLifecycleHook.
156
        :return: a service instance
157
        """
158
        # determine the service_lifecycle_hook
159
        if service_lifecycle_hook is None:
1✔
160
            if isinstance(provider, ServiceLifecycleHook):
1✔
161
                service_lifecycle_hook = provider
1✔
162

163
        # determine the delegate for injecting into the skeleton
164
        delegate = dispatch_table_factory(provider) if dispatch_table_factory else provider
1✔
165
        service = Service(
1✔
166
            name=provider.service,
167
            skeleton=Skeleton(load_service(provider.service), delegate),
168
            lifecycle_hook=service_lifecycle_hook,
169
        )
170
        service._provider = provider
1✔
171

172
        return service
1✔
173

174

175
class ServiceState(Enum):
1✔
176
    UNKNOWN = "unknown"
1✔
177
    AVAILABLE = "available"
1✔
178
    DISABLED = "disabled"
1✔
179
    STARTING = "starting"
1✔
180
    RUNNING = "running"
1✔
181
    STOPPING = "stopping"
1✔
182
    STOPPED = "stopped"
1✔
183
    ERROR = "error"
1✔
184

185

186
class ServiceContainer:
1✔
187
    """
188
    Holds a service, its state, and exposes lifecycle methods of the service.
189
    """
190

191
    service: Service
1✔
192
    state: ServiceState
1✔
193
    lock: threading.RLock
1✔
194
    errors: list[Exception]
1✔
195

196
    def __init__(self, service: Service, state=ServiceState.UNKNOWN):
1✔
197
        self.service = service
1✔
198
        self.state = state
1✔
199
        self.lock = threading.RLock()
1✔
200
        self.errors = []
1✔
201

202
    def get(self) -> Service:
1✔
203
        return self.service
×
204

205
    def start(self) -> bool:
1✔
206
        try:
1✔
207
            self.state = ServiceState.STARTING
1✔
208
            self.service.start(asynchronous=True)
1✔
209
        except Exception as e:
×
210
            self.state = ServiceState.ERROR
×
211
            self.errors.append(e)
×
212
            LOG.error("error while starting service %s: %s", self.service.name(), e)
×
213
            return False
×
214
        return self.check()
1✔
215

216
    def check(self) -> bool:
1✔
217
        try:
1✔
218
            self.service.check(print_error=True)
1✔
219
            self.state = ServiceState.RUNNING
1✔
220
            return True
1✔
221
        except Exception as e:
×
222
            self.state = ServiceState.ERROR
×
223
            self.errors.append(e)
×
224
            LOG.error("error while checking service %s: %s", self.service.name(), e)
×
225
            return False
×
226

227
    def stop(self):
1✔
228
        try:
1✔
229
            self.state = ServiceState.STOPPING
1✔
230
            self.service.stop()
1✔
231
            self.state = ServiceState.STOPPED
1✔
232
        except Exception as e:
×
233
            self.state = ServiceState.ERROR
×
234
            self.errors.append(e)
×
235

236

237
class ServiceManager:
1✔
238
    def __init__(self) -> None:
1✔
239
        super().__init__()
1✔
240
        self._services: dict[str, ServiceContainer] = {}
1✔
241
        self._mutex = threading.RLock()
1✔
242

243
    def get_service_container(self, name: str) -> ServiceContainer | None:
1✔
244
        return self._services.get(name)
1✔
245

246
    def get_service(self, name: str) -> Service | None:
1✔
247
        container = self.get_service_container(name)
1✔
248
        return container.service if container else None
1✔
249

250
    def add_service(self, service: Service) -> bool:
1✔
251
        state = ServiceState.AVAILABLE if service.is_enabled() else ServiceState.DISABLED
1✔
252
        self._services[service.name()] = ServiceContainer(service, state)
1✔
253

254
        return True
1✔
255

256
    def list_available(self) -> list[str]:
1✔
257
        return list(self._services.keys())
×
258

259
    def exists(self, name: str) -> bool:
1✔
260
        return name in self._services
×
261

262
    def is_running(self, name: str) -> bool:
1✔
263
        return self.get_state(name) == ServiceState.RUNNING
×
264

265
    def check(self, name: str) -> bool:
1✔
266
        if self.get_state(name) in [ServiceState.RUNNING, ServiceState.ERROR]:
×
267
            return self.get_service_container(name).check()
×
268

269
    def check_all(self):
1✔
270
        return any(self.check(service_name) for service_name in self.list_available())
×
271

272
    def get_state(self, name: str) -> ServiceState | None:
1✔
273
        container = self.get_service_container(name)
1✔
274
        return container.state if container else None
1✔
275

276
    def get_states(self) -> dict[str, ServiceState]:
1✔
277
        return {name: self.get_state(name) for name in self.list_available()}
1✔
278

279
    @log_duration()
1✔
280
    def require(self, name: str) -> Service:
1✔
281
        """
282
        High level function that always returns a running service, or raises an error. If the service is in a state
283
        that it could be transitioned into a running state, then invoking this function will attempt that transition,
284
        e.g., by starting the service if it is available.
285
        """
286
        container = self.get_service_container(name)
1✔
287

288
        if not container:
1✔
289
            raise ValueError(f"no such service {name}")
×
290

291
        if container.state == ServiceState.STARTING:
1✔
292
            if not poll_condition(lambda: container.state != ServiceState.STARTING, timeout=30):
1✔
293
                raise TimeoutError(f"gave up waiting for service {name} to start")
×
294

295
        if container.state == ServiceState.STOPPING:
1✔
296
            if not poll_condition(lambda: container.state == ServiceState.STOPPED, timeout=30):
×
297
                raise TimeoutError(f"gave up waiting for service {name} to stop")
×
298

299
        with container.lock:
1✔
300
            if container.state == ServiceState.DISABLED:
1✔
301
                raise ServiceDisabled(f"service {name} is disabled")
×
302

303
            if container.state == ServiceState.RUNNING:
1✔
304
                return container.service
1✔
305

306
            if container.state == ServiceState.ERROR:
1✔
307
                # raise any capture error
308
                raise container.errors[-1]
×
309

310
            if container.state == ServiceState.AVAILABLE or container.state == ServiceState.STOPPED:
1✔
311
                if container.start():
1✔
312
                    return container.service
1✔
313
                else:
314
                    raise container.errors[-1]
×
315

316
        raise ServiceStateException(
×
317
            f"service {name} is not ready ({container.state}) and could not be started"
318
        )
319

320
    # legacy map compatibility
321

322
    def items(self):
1✔
323
        return {
×
324
            container.service.name(): container.service for container in self._services.values()
325
        }.items()
326

327
    def keys(self):
1✔
328
        return self._services.keys()
×
329

330
    def values(self):
1✔
331
        return [container.service for container in self._services.values()]
×
332

333
    def get(self, key):
1✔
334
        return self.get_service(key)
×
335

336
    def __iter__(self):
1✔
337
        return self._services
×
338

339

340
class ServicePlugin(Plugin):
1✔
341
    service: Service
1✔
342
    api: str
1✔
343

344
    @abc.abstractmethod
1✔
345
    def create_service(self) -> Service:
1✔
346
        raise NotImplementedError
347

348
    def load(self):
1✔
349
        self.service = self.create_service()
1✔
350
        return self.service
1✔
351

352

353
class ServicePluginAdapter(ServicePlugin):
1✔
354
    def __init__(
1✔
355
        self,
356
        api: str,
357
        create_service: Callable[[], Service],
358
        should_load: Callable[[], bool] = None,
359
    ) -> None:
360
        super().__init__()
1✔
361
        self.api = api
1✔
362
        self._create_service = create_service
1✔
363
        self._should_load = should_load
1✔
364

365
    def should_load(self) -> bool:
1✔
366
        if self._should_load:
1✔
367
            return self._should_load()
×
368
        return True
1✔
369

370
    def create_service(self) -> Service:
1✔
371
        return self._create_service()
1✔
372

373

374
def aws_provider(api: str = None, name="default", should_load: Callable[[], bool] = None):
1✔
375
    """
376
    Decorator for marking methods that create a Service instance as a ServicePlugin. Methods marked with this
377
    decorator are discoverable as a PluginSpec within the namespace "localstack.aws.provider", with the name
378
    "<api>:<name>". If api is not explicitly specified, then the method name is used as api name.
379
    """
380

381
    def wrapper(fn):
1✔
382
        # sugar for being able to name the function like the api
383
        _api = api or fn.__name__
1✔
384

385
        # this causes the plugin framework into pointing the entrypoint to the original function rather than the
386
        # nested factory function
387
        @functools.wraps(fn)
1✔
388
        def factory() -> ServicePluginAdapter:
1✔
389
            return ServicePluginAdapter(api=_api, should_load=should_load, create_service=fn)
1✔
390

391
        return PluginSpec(PLUGIN_NAMESPACE, f"{_api}:{name}", factory=factory)
1✔
392

393
    return wrapper
1✔
394

395

396
class ServicePluginErrorCollector(PluginLifecycleListener):
1✔
397
    """
398
    A PluginLifecycleListener that collects errors related to service plugins.
399
    """
400

401
    errors: dict[tuple[str, str], Exception]  # keys are: (api, provider)
1✔
402

403
    def __init__(self, errors: dict[str, Exception] = None) -> None:
1✔
404
        super().__init__()
1✔
405
        self.errors = errors or {}
1✔
406

407
    def get_key(self, plugin_name) -> tuple[str, str]:
1✔
408
        # the convention is <api>:<provider>, currently we don't really expose the provider
409
        # TODO: faulty plugin names would break this
UNCOV
410
        return tuple(plugin_name.split(":", maxsplit=1))
×
411

412
    def on_resolve_exception(self, namespace: str, entrypoint, exception: Exception):
1✔
UNCOV
413
        self.errors[self.get_key(entrypoint.name)] = exception
×
414

415
    def on_init_exception(self, plugin_spec: PluginSpec, exception: Exception):
1✔
416
        self.errors[self.get_key(plugin_spec.name)] = exception
×
417

418
    def on_load_exception(self, plugin_spec: PluginSpec, plugin: Plugin, exception: Exception):
1✔
419
        self.errors[self.get_key(plugin_spec.name)] = exception
×
420

421
    def has_errors(self, api: str, provider: str = None) -> bool:
1✔
422
        for e_api, e_provider in self.errors.keys():
1✔
UNCOV
423
            if api == e_api:
×
424
                if not provider:
×
425
                    return True
×
426
                else:
427
                    return e_provider == provider
×
428

429
        return False
1✔
430

431

432
class ServicePluginManager(ServiceManager):
1✔
433
    plugin_manager: PluginManager[ServicePlugin]
1✔
434
    plugin_errors: ServicePluginErrorCollector
1✔
435

436
    def __init__(
1✔
437
        self,
438
        plugin_manager: PluginManager[ServicePlugin] = None,
439
        provider_config: ServiceProviderConfig = None,
440
    ) -> None:
441
        super().__init__()
1✔
442
        self.plugin_errors = ServicePluginErrorCollector()
1✔
443
        self.plugin_manager = plugin_manager or PluginManager(
1✔
444
            PLUGIN_NAMESPACE, listener=self.plugin_errors
445
        )
446
        self._api_provider_specs = None
1✔
447
        self.provider_config = provider_config or config.SERVICE_PROVIDER_CONFIG
1✔
448

449
        # locks used to make sure plugin loading is thread safe - will be cleared after single use
450
        self._plugin_load_locks: dict[str, threading.RLock] = SynchronizedDefaultDict(
1✔
451
            threading.RLock
452
        )
453

454
    def get_active_provider(self, service: str) -> str:
1✔
455
        """
456
        Get configured provider for a given service
457

458
        :param service: Service name
459
        :return: configured provider
460
        """
461
        return self.provider_config.get_provider(service)
1✔
462

463
    def get_default_provider(self) -> str:
1✔
464
        """
465
        Get the default provider
466

467
        :return: default provider
468
        """
469
        return self.provider_config.default_value
×
470

471
    # TODO make the abstraction clearer, to provide better information if service is available versus discoverable
472
    # especially important when considering pro services
473
    def list_available(self) -> list[str]:
1✔
474
        """
475
        List all available services, which have an available, configured provider
476

477
        :return: List of service names
478
        """
479
        return [
1✔
480
            service
481
            for service, providers in self.api_provider_specs.items()
482
            if self.get_active_provider(service) in providers
483
        ]
484

485
    def _get_loaded_service_containers(
1✔
486
        self, services: list[str] | None = None
487
    ) -> list[ServiceContainer]:
488
        """
489
        Returns all the available service containers.
490
        :param services: the list of services to restrict the search to. If empty or NULL then service containers for
491
                         all available services are queried.
492
        :return: a list of all the available service containers.
493
        """
494
        services = services or self.list_available()
1✔
495
        return [
1✔
496
            c for s in services if (c := super(ServicePluginManager, self).get_service_container(s))
497
        ]
498

499
    def list_loaded_services(self) -> list[str]:
1✔
500
        """
501
        Lists all the services which have a provider that has been initialized
502

503
        :return: a list of service names
504
        """
505
        return [
×
506
            service_container.service.name()
507
            for service_container in self._get_loaded_service_containers()
508
        ]
509

510
    def list_active_services(self) -> list[str]:
1✔
511
        """
512
        Lists all services that have an initialised provider and are currently running.
513

514
        :return: the list of active service names.
515
        """
516
        return [
×
517
            service_container.service.name()
518
            for service_container in self._get_loaded_service_containers()
519
            if service_container.state == ServiceState.RUNNING
520
        ]
521

522
    def exists(self, name: str) -> bool:
1✔
523
        return name in self.list_available()
1✔
524

525
    def get_state(self, name: str) -> ServiceState | None:
1✔
526
        if name in self._services:
1✔
527
            # ServiceContainer exists, which means the plugin has been loaded
528
            return super().get_state(name)
1✔
529

530
        if not self.exists(name):
1✔
531
            # there's definitely no service with this name
532
            return None
×
533

534
        # if a PluginSpec exists, then we can get the container and check whether there was an error loading the plugin
535
        provider = self.get_active_provider(name)
1✔
536
        if self.plugin_errors.has_errors(name, provider):
1✔
537
            return ServiceState.ERROR
×
538

539
        return ServiceState.AVAILABLE if is_api_enabled(name) else ServiceState.DISABLED
1✔
540

541
    def get_service_container(self, name: str) -> ServiceContainer | None:
1✔
542
        if container := self._services.get(name):
1✔
543
            return container
1✔
544

545
        if not self.exists(name):
1✔
546
            return None
×
547

548
        load_lock = self._plugin_load_locks[name]
1✔
549
        with load_lock:
1✔
550
            # check once again to avoid race conditions
551
            if container := self._services.get(name):
1✔
552
                return container
1✔
553

554
            # this is where we start lazy loading. we now know the PluginSpec for the API exists,
555
            # but the ServiceContainer has not been created.
556
            # this control path will be executed once per service
557
            plugin = self._load_service_plugin(name)
1✔
558
            if not plugin or not plugin.service:
1✔
559
                return None
×
560

561
            with self._mutex:
1✔
562
                super().add_service(plugin.service)
1✔
563

564
            del self._plugin_load_locks[name]  # we only needed the service lock once
1✔
565

566
            return self._services.get(name)
1✔
567

568
    @property
1✔
569
    def api_provider_specs(self) -> dict[str, list[str]]:
1✔
570
        """
571
        Returns all provider names within the service plugin namespace and parses their name according to the convention,
572
        that is "<api>:<provider>". The result is a dictionary that maps api => List[str (name of a provider)].
573
        """
574
        if self._api_provider_specs is not None:
1✔
575
            return self._api_provider_specs
1✔
576

577
        with self._mutex:
1✔
578
            if self._api_provider_specs is None:
1✔
579
                self._api_provider_specs = self._resolve_api_provider_specs()
1✔
580
            return self._api_provider_specs
1✔
581

582
    @log_duration()
1✔
583
    def _load_service_plugin(self, name: str) -> ServicePlugin | None:
1✔
584
        providers = self.api_provider_specs.get(name)
1✔
585
        if not providers:
1✔
586
            # no providers for this api
587
            return None
×
588

589
        preferred_provider = self.get_active_provider(name)
1✔
590
        if preferred_provider in providers:
1✔
591
            provider = preferred_provider
1✔
592
        else:
593
            default = self.get_default_provider()
×
594
            LOG.warning(
×
595
                "Configured provider (%s) does not exist for service (%s). Available options are: %s. "
596
                "Falling back to default provider '%s'. This can impact the availability of Pro functionality, "
597
                "please fix this configuration issue as soon as possible.",
598
                preferred_provider,
599
                name,
600
                providers,
601
                default,
602
            )
603
            provider = default
×
604

605
        plugin_name = f"{name}:{provider}"
1✔
606
        plugin = self.plugin_manager.load(plugin_name)
1✔
607
        plugin.name = plugin_name
1✔
608

609
        return plugin
1✔
610

611
    @log_duration()
1✔
612
    def _resolve_api_provider_specs(self) -> dict[str, list[str]]:
1✔
613
        result = defaultdict(list)
1✔
614

615
        for spec in self.plugin_manager.list_plugin_specs():
1✔
616
            api, provider = spec.name.split(
1✔
617
                ":"
618
            )  # TODO: error handling, faulty plugins could break the runtime
619
            result[api].append(provider)
1✔
620

621
        return result
1✔
622

623
    def apis_with_provider(self, provider: str) -> list[str]:
1✔
624
        """
625
        Lists all apis where a given provider exists for.
626
        :param provider: Name of the provider
627
        :return: List of apis the given provider provides
628
        """
629
        apis = []
×
630
        for api, providers in self.api_provider_specs.items():
×
631
            if provider in providers:
×
632
                apis.append(api)
×
633
        return apis
×
634

635
    def _stop_services(self, service_containers: list[ServiceContainer]) -> None:
1✔
636
        """
637
        Atomically attempts to stop all given 'ServiceState.STARTING' and 'ServiceState.RUNNING' services.
638
        :param service_containers: the list of service containers to be stopped.
639
        """
640
        target_service_states = {ServiceState.STARTING, ServiceState.RUNNING}
1✔
641
        with self._mutex:
1✔
642
            for service_container in service_containers:
1✔
643
                if service_container.state in target_service_states:
1✔
644
                    service_container.stop()
1✔
645

646
    def stop_services(self, services: list[str] = None):
1✔
647
        """
648
        Stops services for this service manager, if they are currently active.
649
        Will not stop services not already started or in and error state.
650

651
        :param services: Service names to stop. If not provided, all services for this manager will be stopped.
652
        """
653
        target_service_containers = self._get_loaded_service_containers(services=services)
×
654
        self._stop_services(target_service_containers)
×
655

656
    def stop_all_services(self) -> None:
1✔
657
        """
658
        Stops all services for this service manager, if they are currently active.
659
        Will not stop services not already started or in and error state.
660
        """
661
        target_service_containers = self._get_loaded_service_containers()
1✔
662
        self._stop_services(target_service_containers)
1✔
663

664

665
# map of service plugins, mapping from service name to plugin details
666
SERVICE_PLUGINS: ServicePluginManager = ServicePluginManager()
1✔
667

668

669
# -----------------------------
670
# INFRASTRUCTURE HEALTH CHECKS
671
# -----------------------------
672

673

674
def wait_for_infra_shutdown():
1✔
675
    apis = get_enabled_apis()
×
676

677
    names = [name for name, plugin in SERVICE_PLUGINS.items() if name in apis]
×
678

679
    def check(name):
×
680
        check_service_health(api=name, expect_shutdown=True)
×
681
        LOG.debug("[shutdown] api %s has shut down", name)
×
682

683
    # no special significance to 10 workers, seems like a reasonable number given the number of services we have
684
    with ThreadPoolExecutor(max_workers=10) as executor:
×
685
        executor.map(check, names)
×
686

687

688
def check_service_health(api, expect_shutdown=False):
1✔
689
    status = SERVICE_PLUGINS.check(api)
×
690
    if status == expect_shutdown:
×
691
        if not expect_shutdown:
×
692
            LOG.warning('Service "%s" not yet available, retrying...', api)
×
693
        else:
694
            LOG.warning('Service "%s" still shutting down, retrying...', api)
×
695
        raise Exception(f"Service check failed for api: {api}")
×
696

697

698
@hooks.on_infra_start(should_load=lambda: config.EAGER_SERVICE_LOADING)
1✔
699
def eager_load_services():
1✔
700
    from localstack.utils.bootstrap import get_preloaded_services
×
701

702
    preloaded_apis = get_preloaded_services()
×
703
    LOG.debug("Eager loading services: %s", sorted(preloaded_apis))
×
704

705
    for api in preloaded_apis:
×
706
        try:
×
707
            SERVICE_PLUGINS.require(api)
×
708
        except ServiceDisabled as e:
×
709
            LOG.debug("%s", e)
×
710
        except Exception:
×
711
            LOG.error(
×
712
                "could not load service plugin %s",
713
                api,
714
                exc_info=LOG.isEnabledFor(logging.DEBUG),
715
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc