• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 6523d139-4c8d-4daf-a514-97baaa202bd9

04 Jun 2025 04:30PM UTC coverage: 86.762% (-0.006%) from 86.768%
6523d139-4c8d-4daf-a514-97baaa202bd9

push

circleci

web-flow
test(esm/sqs): Skip flaky test_report_batch_item_failures test (#12713)

65076 of 75005 relevant lines covered (86.76%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.99
/localstack-core/localstack/services/plugins.py
1
import abc
1✔
2
import functools
1✔
3
import logging
1✔
4
import threading
1✔
5
from collections import defaultdict
1✔
6
from concurrent.futures import ThreadPoolExecutor
1✔
7
from enum import Enum
1✔
8
from typing import Callable, Dict, List, Optional, Protocol, Tuple
1✔
9

10
from plux import Plugin, PluginLifecycleListener, PluginManager, PluginSpec
1✔
11

12
from localstack import config
1✔
13
from localstack.aws.skeleton import DispatchTable, Skeleton
1✔
14
from localstack.aws.spec import load_service
1✔
15
from localstack.config import ServiceProviderConfig
1✔
16
from localstack.runtime import hooks
1✔
17
from localstack.state import StateLifecycleHook, StateVisitable, StateVisitor
1✔
18
from localstack.utils.bootstrap import get_enabled_apis, is_api_enabled, log_duration
1✔
19
from localstack.utils.functions import call_safe
1✔
20
from localstack.utils.sync import SynchronizedDefaultDict, poll_condition
1✔
21

22
# set up logger
23
LOG = logging.getLogger(__name__)
1✔
24

25
# namespace for AWS provider plugins
26
PLUGIN_NAMESPACE = "localstack.aws.provider"
1✔
27

28
_default = object()  # sentinel object indicating a default value
1✔
29

30

31
# -----------------
32
# PLUGIN UTILITIES
33
# -----------------
34

35

36
class ServiceException(Exception):
1✔
37
    pass
1✔
38

39

40
class ServiceDisabled(ServiceException):
1✔
41
    pass
1✔
42

43

44
class ServiceStateException(ServiceException):
1✔
45
    pass
1✔
46

47

48
class ServiceLifecycleHook(StateLifecycleHook):
1✔
49
    def on_after_init(self):
1✔
50
        pass
1✔
51

52
    def on_before_start(self):
1✔
53
        pass
1✔
54

55
    def on_before_stop(self):
1✔
56
        pass
1✔
57

58
    def on_exception(self):
1✔
59
        pass
×
60

61

62
class ServiceProvider(Protocol):
1✔
63
    service: str
1✔
64

65

66
class Service:
1✔
67
    """
68
    FIXME: this has become frankenstein's monster, and it has to go. once we've rid ourselves of the legacy edge
69
     proxy, we can get rid of the ``listener`` concept. we should then do one iteration over all the
70
     ``start_dynamodb``, ``start_<whatever>``, ``check_<whatever>``, etc. methods, to make all of those integral part
71
     of the service provider. the assumption that every service provider starts a backend server is outdated, and then
72
     we can get rid of ``start``, and ``check``.
73
    """
74

75
    def __init__(
1✔
76
        self,
77
        name,
78
        start=_default,
79
        check=None,
80
        skeleton=None,
81
        active=False,
82
        stop=None,
83
        lifecycle_hook: ServiceLifecycleHook = None,
84
    ):
85
        self.plugin_name = name
1✔
86
        self.start_function = start
1✔
87
        self.skeleton = skeleton
1✔
88
        self.check_function = check
1✔
89
        self.default_active = active
1✔
90
        self.stop_function = stop
1✔
91
        self.lifecycle_hook = lifecycle_hook or ServiceLifecycleHook()
1✔
92
        self._provider = None
1✔
93
        call_safe(self.lifecycle_hook.on_after_init)
1✔
94

95
    def start(self, asynchronous):
1✔
96
        call_safe(self.lifecycle_hook.on_before_start)
1✔
97

98
        if not self.start_function:
1✔
99
            return
×
100

101
        if self.start_function is _default:
1✔
102
            return
1✔
103

104
        kwargs = {"asynchronous": asynchronous}
×
105
        if self.skeleton:
×
106
            kwargs["update_listener"] = self.skeleton
×
107
        return self.start_function(**kwargs)
×
108

109
    def stop(self):
1✔
110
        call_safe(self.lifecycle_hook.on_before_stop)
1✔
111
        if not self.stop_function:
1✔
112
            return
1✔
113
        return self.stop_function()
×
114

115
    def check(self, expect_shutdown=False, print_error=False):
1✔
116
        if not self.check_function:
1✔
117
            return
1✔
118
        return self.check_function(expect_shutdown=expect_shutdown, print_error=print_error)
×
119

120
    def name(self):
1✔
121
        return self.plugin_name
1✔
122

123
    def is_enabled(self):
1✔
124
        return is_api_enabled(self.plugin_name)
1✔
125

126
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
127
        """
128
        Passes the StateVisitor to the ASF provider if it is set and implements the StateVisitable. Otherwise, it uses
129
        the ReflectionStateLocator to visit the service state.
130

131
        :param visitor: the visitor
132
        """
133
        if self._provider and isinstance(self._provider, StateVisitable):
×
134
            self._provider.accept_state_visitor(visitor)
×
135
            return
×
136

137
        from localstack.state.inspect import ReflectionStateLocator
×
138

139
        ReflectionStateLocator(service=self.name()).accept_state_visitor(visitor)
×
140

141
    @staticmethod
1✔
142
    def for_provider(
1✔
143
        provider: ServiceProvider,
144
        dispatch_table_factory: Callable[[ServiceProvider], DispatchTable] = None,
145
        service_lifecycle_hook: ServiceLifecycleHook = None,
146
    ) -> "Service":
147
        """
148
        Factory method for creating services for providers. This method hides a bunch of legacy code and
149
        band-aids/adapters to make persistence visitors work, while providing compatibility with the legacy edge proxy.
150

151
        :param provider: the service provider, i.e., the implementation of the generated ASF service API.
152
        :param dispatch_table_factory: a `MotoFallbackDispatcher` or something similar that uses the provider to
153
            create a dispatch table. this one's a bit clumsy.
154
        :param service_lifecycle_hook: if left empty, the factory checks whether the provider is a ServiceLifecycleHook.
155
        :return: a service instance
156
        """
157
        # determine the service_lifecycle_hook
158
        if service_lifecycle_hook is None:
1✔
159
            if isinstance(provider, ServiceLifecycleHook):
1✔
160
                service_lifecycle_hook = provider
1✔
161

162
        # determine the delegate for injecting into the skeleton
163
        delegate = dispatch_table_factory(provider) if dispatch_table_factory else provider
1✔
164
        service = Service(
1✔
165
            name=provider.service,
166
            skeleton=Skeleton(load_service(provider.service), delegate),
167
            lifecycle_hook=service_lifecycle_hook,
168
        )
169
        service._provider = provider
1✔
170

171
        return service
1✔
172

173

174
class ServiceState(Enum):
1✔
175
    UNKNOWN = "unknown"
1✔
176
    AVAILABLE = "available"
1✔
177
    DISABLED = "disabled"
1✔
178
    STARTING = "starting"
1✔
179
    RUNNING = "running"
1✔
180
    STOPPING = "stopping"
1✔
181
    STOPPED = "stopped"
1✔
182
    ERROR = "error"
1✔
183

184

185
class ServiceContainer:
1✔
186
    """
187
    Holds a service, its state, and exposes lifecycle methods of the service.
188
    """
189

190
    service: Service
1✔
191
    state: ServiceState
1✔
192
    lock: threading.RLock
1✔
193
    errors: List[Exception]
1✔
194

195
    def __init__(self, service: Service, state=ServiceState.UNKNOWN):
1✔
196
        self.service = service
1✔
197
        self.state = state
1✔
198
        self.lock = threading.RLock()
1✔
199
        self.errors = []
1✔
200

201
    def get(self) -> Service:
1✔
202
        return self.service
×
203

204
    def start(self) -> bool:
1✔
205
        try:
1✔
206
            self.state = ServiceState.STARTING
1✔
207
            self.service.start(asynchronous=True)
1✔
208
        except Exception as e:
×
209
            self.state = ServiceState.ERROR
×
210
            self.errors.append(e)
×
211
            LOG.error("error while starting service %s: %s", self.service.name(), e)
×
212
            return False
×
213
        return self.check()
1✔
214

215
    def check(self) -> bool:
1✔
216
        try:
1✔
217
            self.service.check(print_error=True)
1✔
218
            self.state = ServiceState.RUNNING
1✔
219
            return True
1✔
220
        except Exception as e:
×
221
            self.state = ServiceState.ERROR
×
222
            self.errors.append(e)
×
223
            LOG.error("error while checking service %s: %s", self.service.name(), e)
×
224
            return False
×
225

226
    def stop(self):
1✔
227
        try:
1✔
228
            self.state = ServiceState.STOPPING
1✔
229
            self.service.stop()
1✔
230
            self.state = ServiceState.STOPPED
1✔
231
        except Exception as e:
×
232
            self.state = ServiceState.ERROR
×
233
            self.errors.append(e)
×
234

235

236
class ServiceManager:
1✔
237
    def __init__(self) -> None:
1✔
238
        super().__init__()
1✔
239
        self._services: Dict[str, ServiceContainer] = {}
1✔
240
        self._mutex = threading.RLock()
1✔
241

242
    def get_service_container(self, name: str) -> Optional[ServiceContainer]:
1✔
243
        return self._services.get(name)
1✔
244

245
    def get_service(self, name: str) -> Optional[Service]:
1✔
246
        container = self.get_service_container(name)
1✔
247
        return container.service if container else None
1✔
248

249
    def add_service(self, service: Service) -> bool:
1✔
250
        state = ServiceState.AVAILABLE if service.is_enabled() else ServiceState.DISABLED
1✔
251
        self._services[service.name()] = ServiceContainer(service, state)
1✔
252

253
        return True
1✔
254

255
    def list_available(self) -> List[str]:
1✔
256
        return list(self._services.keys())
×
257

258
    def exists(self, name: str) -> bool:
1✔
259
        return name in self._services
×
260

261
    def is_running(self, name: str) -> bool:
1✔
262
        return self.get_state(name) == ServiceState.RUNNING
×
263

264
    def check(self, name: str) -> bool:
1✔
265
        if self.get_state(name) in [ServiceState.RUNNING, ServiceState.ERROR]:
×
266
            return self.get_service_container(name).check()
×
267

268
    def check_all(self):
1✔
269
        return any(self.check(service_name) for service_name in self.list_available())
×
270

271
    def get_state(self, name: str) -> Optional[ServiceState]:
1✔
272
        container = self.get_service_container(name)
1✔
273
        return container.state if container else None
1✔
274

275
    def get_states(self) -> Dict[str, ServiceState]:
1✔
276
        return {name: self.get_state(name) for name in self.list_available()}
1✔
277

278
    @log_duration()
1✔
279
    def require(self, name: str) -> Service:
1✔
280
        """
281
        High level function that always returns a running service, or raises an error. If the service is in a state
282
        that it could be transitioned into a running state, then invoking this function will attempt that transition,
283
        e.g., by starting the service if it is available.
284
        """
285
        container = self.get_service_container(name)
1✔
286

287
        if not container:
1✔
288
            raise ValueError("no such service %s" % name)
×
289

290
        if container.state == ServiceState.STARTING:
1✔
291
            if not poll_condition(lambda: container.state != ServiceState.STARTING, timeout=30):
×
292
                raise TimeoutError("gave up waiting for service %s to start" % name)
×
293

294
        if container.state == ServiceState.STOPPING:
1✔
295
            if not poll_condition(lambda: container.state == ServiceState.STOPPED, timeout=30):
×
296
                raise TimeoutError("gave up waiting for service %s to stop" % name)
×
297

298
        with container.lock:
1✔
299
            if container.state == ServiceState.DISABLED:
1✔
300
                raise ServiceDisabled("service %s is disabled" % name)
×
301

302
            if container.state == ServiceState.RUNNING:
1✔
303
                return container.service
1✔
304

305
            if container.state == ServiceState.ERROR:
1✔
306
                # raise any capture error
307
                raise container.errors[-1]
×
308

309
            if container.state == ServiceState.AVAILABLE or container.state == ServiceState.STOPPED:
1✔
310
                if container.start():
1✔
311
                    return container.service
1✔
312
                else:
313
                    raise container.errors[-1]
×
314

315
        raise ServiceStateException(
×
316
            "service %s is not ready (%s) and could not be started" % (name, container.state)
317
        )
318

319
    # legacy map compatibility
320

321
    def items(self):
1✔
322
        return {
×
323
            container.service.name(): container.service for container in self._services.values()
324
        }.items()
325

326
    def keys(self):
1✔
327
        return self._services.keys()
×
328

329
    def values(self):
1✔
330
        return [container.service for container in self._services.values()]
×
331

332
    def get(self, key):
1✔
333
        return self.get_service(key)
×
334

335
    def __iter__(self):
1✔
336
        return self._services
×
337

338

339
class ServicePlugin(Plugin):
1✔
340
    service: Service
1✔
341
    api: str
1✔
342

343
    @abc.abstractmethod
1✔
344
    def create_service(self) -> Service:
1✔
345
        raise NotImplementedError
346

347
    def load(self):
1✔
348
        self.service = self.create_service()
1✔
349
        return self.service
1✔
350

351

352
class ServicePluginAdapter(ServicePlugin):
1✔
353
    def __init__(
1✔
354
        self,
355
        api: str,
356
        create_service: Callable[[], Service],
357
        should_load: Callable[[], bool] = None,
358
    ) -> None:
359
        super().__init__()
1✔
360
        self.api = api
1✔
361
        self._create_service = create_service
1✔
362
        self._should_load = should_load
1✔
363

364
    def should_load(self) -> bool:
1✔
365
        if self._should_load:
1✔
366
            return self._should_load()
×
367
        return True
1✔
368

369
    def create_service(self) -> Service:
1✔
370
        return self._create_service()
1✔
371

372

373
def aws_provider(api: str = None, name="default", should_load: Callable[[], bool] = None):
1✔
374
    """
375
    Decorator for marking methods that create a Service instance as a ServicePlugin. Methods marked with this
376
    decorator are discoverable as a PluginSpec within the namespace "localstack.aws.provider", with the name
377
    "<api>:<name>". If api is not explicitly specified, then the method name is used as api name.
378
    """
379

380
    def wrapper(fn):
1✔
381
        # sugar for being able to name the function like the api
382
        _api = api or fn.__name__
1✔
383

384
        # this causes the plugin framework into pointing the entrypoint to the original function rather than the
385
        # nested factory function
386
        @functools.wraps(fn)
1✔
387
        def factory() -> ServicePluginAdapter:
1✔
388
            return ServicePluginAdapter(api=_api, should_load=should_load, create_service=fn)
1✔
389

390
        return PluginSpec(PLUGIN_NAMESPACE, f"{_api}:{name}", factory=factory)
1✔
391

392
    return wrapper
1✔
393

394

395
class ServicePluginErrorCollector(PluginLifecycleListener):
1✔
396
    """
397
    A PluginLifecycleListener that collects errors related to service plugins.
398
    """
399

400
    errors: Dict[Tuple[str, str], Exception]  # keys are: (api, provider)
1✔
401

402
    def __init__(self, errors: Dict[str, Exception] = None) -> None:
1✔
403
        super().__init__()
1✔
404
        self.errors = errors or {}
1✔
405

406
    def get_key(self, plugin_name) -> Tuple[str, str]:
1✔
407
        # the convention is <api>:<provider>, currently we don't really expose the provider
408
        # TODO: faulty plugin names would break this
409
        return tuple(plugin_name.split(":", maxsplit=1))
×
410

411
    def on_resolve_exception(self, namespace: str, entrypoint, exception: Exception):
1✔
412
        self.errors[self.get_key(entrypoint.name)] = exception
×
413

414
    def on_init_exception(self, plugin_spec: PluginSpec, exception: Exception):
1✔
415
        self.errors[self.get_key(plugin_spec.name)] = exception
×
416

417
    def on_load_exception(self, plugin_spec: PluginSpec, plugin: Plugin, exception: Exception):
1✔
418
        self.errors[self.get_key(plugin_spec.name)] = exception
×
419

420
    def has_errors(self, api: str, provider: str = None) -> bool:
1✔
421
        for e_api, e_provider in self.errors.keys():
1✔
422
            if api == e_api:
×
423
                if not provider:
×
424
                    return True
×
425
                else:
426
                    return e_provider == provider
×
427

428
        return False
1✔
429

430

431
class ServicePluginManager(ServiceManager):
1✔
432
    plugin_manager: PluginManager[ServicePlugin]
1✔
433
    plugin_errors: ServicePluginErrorCollector
1✔
434

435
    def __init__(
1✔
436
        self,
437
        plugin_manager: PluginManager[ServicePlugin] = None,
438
        provider_config: ServiceProviderConfig = None,
439
    ) -> None:
440
        super().__init__()
1✔
441
        self.plugin_errors = ServicePluginErrorCollector()
1✔
442
        self.plugin_manager = plugin_manager or PluginManager(
1✔
443
            PLUGIN_NAMESPACE, listener=self.plugin_errors
444
        )
445
        self._api_provider_specs = None
1✔
446
        self.provider_config = provider_config or config.SERVICE_PROVIDER_CONFIG
1✔
447

448
        # locks used to make sure plugin loading is thread safe - will be cleared after single use
449
        self._plugin_load_locks: Dict[str, threading.RLock] = SynchronizedDefaultDict(
1✔
450
            threading.RLock
451
        )
452

453
    def get_active_provider(self, service: str) -> str:
1✔
454
        """
455
        Get configured provider for a given service
456

457
        :param service: Service name
458
        :return: configured provider
459
        """
460
        return self.provider_config.get_provider(service)
1✔
461

462
    def get_default_provider(self) -> str:
1✔
463
        """
464
        Get the default provider
465

466
        :return: default provider
467
        """
468
        return self.provider_config.default_value
×
469

470
    # TODO make the abstraction clearer, to provide better information if service is available versus discoverable
471
    # especially important when considering pro services
472
    def list_available(self) -> List[str]:
1✔
473
        """
474
        List all available services, which have an available, configured provider
475

476
        :return: List of service names
477
        """
478
        return [
1✔
479
            service
480
            for service, providers in self.api_provider_specs.items()
481
            if self.get_active_provider(service) in providers
482
        ]
483

484
    def _get_loaded_service_containers(
1✔
485
        self, services: Optional[List[str]] = None
486
    ) -> List[ServiceContainer]:
487
        """
488
        Returns all the available service containers.
489
        :param services: the list of services to restrict the search to. If empty or NULL then service containers for
490
                         all available services are queried.
491
        :return: a list of all the available service containers.
492
        """
493
        services = services or self.list_available()
1✔
494
        return [
1✔
495
            c for s in services if (c := super(ServicePluginManager, self).get_service_container(s))
496
        ]
497

498
    def list_loaded_services(self) -> List[str]:
1✔
499
        """
500
        Lists all the services which have a provider that has been initialized
501

502
        :return: a list of service names
503
        """
504
        return [
×
505
            service_container.service.name()
506
            for service_container in self._get_loaded_service_containers()
507
        ]
508

509
    def list_active_services(self) -> List[str]:
1✔
510
        """
511
        Lists all services that have an initialised provider and are currently running.
512

513
        :return: the list of active service names.
514
        """
515
        return [
×
516
            service_container.service.name()
517
            for service_container in self._get_loaded_service_containers()
518
            if service_container.state == ServiceState.RUNNING
519
        ]
520

521
    def exists(self, name: str) -> bool:
1✔
522
        return name in self.list_available()
1✔
523

524
    def get_state(self, name: str) -> Optional[ServiceState]:
1✔
525
        if name in self._services:
1✔
526
            # ServiceContainer exists, which means the plugin has been loaded
527
            return super().get_state(name)
1✔
528

529
        if not self.exists(name):
1✔
530
            # there's definitely no service with this name
531
            return None
×
532

533
        # if a PluginSpec exists, then we can get the container and check whether there was an error loading the plugin
534
        provider = self.get_active_provider(name)
1✔
535
        if self.plugin_errors.has_errors(name, provider):
1✔
536
            return ServiceState.ERROR
×
537

538
        return ServiceState.AVAILABLE if is_api_enabled(name) else ServiceState.DISABLED
1✔
539

540
    def get_service_container(self, name: str) -> Optional[ServiceContainer]:
1✔
541
        if container := self._services.get(name):
1✔
542
            return container
1✔
543

544
        if not self.exists(name):
1✔
545
            return None
×
546

547
        load_lock = self._plugin_load_locks[name]
1✔
548
        with load_lock:
1✔
549
            # check once again to avoid race conditions
550
            if container := self._services.get(name):
1✔
551
                return container
1✔
552

553
            # this is where we start lazy loading. we now know the PluginSpec for the API exists,
554
            # but the ServiceContainer has not been created.
555
            # this control path will be executed once per service
556
            plugin = self._load_service_plugin(name)
1✔
557
            if not plugin or not plugin.service:
1✔
558
                return None
×
559

560
            with self._mutex:
1✔
561
                super().add_service(plugin.service)
1✔
562

563
            del self._plugin_load_locks[name]  # we only needed the service lock once
1✔
564

565
            return self._services.get(name)
1✔
566

567
    @property
1✔
568
    def api_provider_specs(self) -> Dict[str, List[str]]:
1✔
569
        """
570
        Returns all provider names within the service plugin namespace and parses their name according to the convention,
571
        that is "<api>:<provider>". The result is a dictionary that maps api => List[str (name of a provider)].
572
        """
573
        if self._api_provider_specs is not None:
1✔
574
            return self._api_provider_specs
1✔
575

576
        with self._mutex:
1✔
577
            if self._api_provider_specs is None:
1✔
578
                self._api_provider_specs = self._resolve_api_provider_specs()
1✔
579
            return self._api_provider_specs
1✔
580

581
    @log_duration()
1✔
582
    def _load_service_plugin(self, name: str) -> Optional[ServicePlugin]:
1✔
583
        providers = self.api_provider_specs.get(name)
1✔
584
        if not providers:
1✔
585
            # no providers for this api
586
            return None
×
587

588
        preferred_provider = self.get_active_provider(name)
1✔
589
        if preferred_provider in providers:
1✔
590
            provider = preferred_provider
1✔
591
        else:
592
            default = self.get_default_provider()
×
593
            LOG.warning(
×
594
                "Configured provider (%s) does not exist for service (%s). Available options are: %s. "
595
                "Falling back to default provider '%s'. This can impact the availability of Pro functionality, "
596
                "please fix this configuration issue as soon as possible.",
597
                preferred_provider,
598
                name,
599
                providers,
600
                default,
601
            )
602
            provider = default
×
603

604
        plugin_name = f"{name}:{provider}"
1✔
605
        plugin = self.plugin_manager.load(plugin_name)
1✔
606
        plugin.name = plugin_name
1✔
607

608
        return plugin
1✔
609

610
    @log_duration()
1✔
611
    def _resolve_api_provider_specs(self) -> Dict[str, List[str]]:
1✔
612
        result = defaultdict(list)
1✔
613

614
        for spec in self.plugin_manager.list_plugin_specs():
1✔
615
            api, provider = spec.name.split(
1✔
616
                ":"
617
            )  # TODO: error handling, faulty plugins could break the runtime
618
            result[api].append(provider)
1✔
619

620
        return result
1✔
621

622
    def apis_with_provider(self, provider: str) -> List[str]:
1✔
623
        """
624
        Lists all apis where a given provider exists for.
625
        :param provider: Name of the provider
626
        :return: List of apis the given provider provides
627
        """
628
        apis = []
×
629
        for api, providers in self.api_provider_specs.items():
×
630
            if provider in providers:
×
631
                apis.append(api)
×
632
        return apis
×
633

634
    def _stop_services(self, service_containers: List[ServiceContainer]) -> None:
1✔
635
        """
636
        Atomically attempts to stop all given 'ServiceState.STARTING' and 'ServiceState.RUNNING' services.
637
        :param service_containers: the list of service containers to be stopped.
638
        """
639
        target_service_states = {ServiceState.STARTING, ServiceState.RUNNING}
1✔
640
        with self._mutex:
1✔
641
            for service_container in service_containers:
1✔
642
                if service_container.state in target_service_states:
1✔
643
                    service_container.stop()
1✔
644

645
    def stop_services(self, services: List[str] = None):
1✔
646
        """
647
        Stops services for this service manager, if they are currently active.
648
        Will not stop services not already started or in and error state.
649

650
        :param services: Service names to stop. If not provided, all services for this manager will be stopped.
651
        """
652
        target_service_containers = self._get_loaded_service_containers(services=services)
×
653
        self._stop_services(target_service_containers)
×
654

655
    def stop_all_services(self) -> None:
1✔
656
        """
657
        Stops all services for this service manager, if they are currently active.
658
        Will not stop services not already started or in and error state.
659
        """
660
        target_service_containers = self._get_loaded_service_containers()
1✔
661
        self._stop_services(target_service_containers)
1✔
662

663

664
# map of service plugins, mapping from service name to plugin details
665
SERVICE_PLUGINS: ServicePluginManager = ServicePluginManager()
1✔
666

667

668
# -----------------------------
669
# INFRASTRUCTURE HEALTH CHECKS
670
# -----------------------------
671

672

673
def wait_for_infra_shutdown():
1✔
674
    apis = get_enabled_apis()
×
675

676
    names = [name for name, plugin in SERVICE_PLUGINS.items() if name in apis]
×
677

678
    def check(name):
×
679
        check_service_health(api=name, expect_shutdown=True)
×
680
        LOG.debug("[shutdown] api %s has shut down", name)
×
681

682
    # no special significance to 10 workers, seems like a reasonable number given the number of services we have
683
    with ThreadPoolExecutor(max_workers=10) as executor:
×
684
        executor.map(check, names)
×
685

686

687
def check_service_health(api, expect_shutdown=False):
1✔
688
    status = SERVICE_PLUGINS.check(api)
×
689
    if status == expect_shutdown:
×
690
        if not expect_shutdown:
×
691
            LOG.warning('Service "%s" not yet available, retrying...', api)
×
692
        else:
693
            LOG.warning('Service "%s" still shutting down, retrying...', api)
×
694
        raise Exception("Service check failed for api: %s" % api)
×
695

696

697
@hooks.on_infra_start(should_load=lambda: config.EAGER_SERVICE_LOADING)
1✔
698
def eager_load_services():
1✔
699
    from localstack.utils.bootstrap import get_preloaded_services
×
700

701
    preloaded_apis = get_preloaded_services()
×
702
    LOG.debug("Eager loading services: %s", sorted(preloaded_apis))
×
703

704
    for api in preloaded_apis:
×
705
        try:
×
706
            SERVICE_PLUGINS.require(api)
×
707
        except ServiceDisabled as e:
×
708
            LOG.debug("%s", e)
×
709
        except Exception:
×
710
            LOG.exception("could not load service plugin %s", api)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc