• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.45
/localstack-core/localstack/aws/spec.py
1
import dataclasses
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import sys
1✔
6
from collections import defaultdict
1✔
7
from collections.abc import Generator
1✔
8
from functools import cached_property, lru_cache
1✔
9
from typing import Literal, NamedTuple
1✔
10

11
import botocore
1✔
12
import jsonpatch
1✔
13
from botocore.exceptions import UnknownServiceError
1✔
14
from botocore.loaders import Loader, instance_cache
1✔
15
from botocore.model import OperationModel, ServiceModel
1✔
16

17
from localstack import config
1✔
18
from localstack.constants import VERSION
1✔
19
from localstack.utils.objects import singleton_factory
1✔
20

21
LOG = logging.getLogger(__name__)
1✔
22

23
ServiceName = str
1✔
24
ProtocolName = Literal["query", "json", "rest-json", "rest-xml", "ec2", "smithy-rpc-v2-cbor"]
1✔
25

26

27
class ServiceModelIdentifier(NamedTuple):
1✔
28
    """
29
    Identifies a specific service model.
30
    If the protocol is not given, the default protocol of the service with the specific name is assumed.
31
    Maybe also add versions here in the future (if we can support multiple different versions for one service).
32
    """
33

34
    name: ServiceName
1✔
35
    protocol: ProtocolName | None = None
1✔
36
    protocols: tuple[ProtocolName] | None = None
1✔
37

38

39
spec_patches_json = os.path.join(os.path.dirname(__file__), "spec-patches.json")
1✔
40

41

42
def load_spec_patches() -> dict[str, list]:
1✔
43
    if not os.path.exists(spec_patches_json):
1✔
44
        return {}
×
45
    with open(spec_patches_json) as fd:
1✔
46
        return json.load(fd)
1✔
47

48

49
# Path for custom specs which are not (anymore) provided by botocore
50
LOCALSTACK_BUILTIN_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
1✔
51

52

53
class LocalStackBuiltInDataLoaderMixin(Loader):
1✔
54
    def __init__(self, *args, **kwargs):
1✔
55
        # add the builtin data path to the extra_search_paths to ensure they are discovered by the loader
56
        super().__init__(*args, extra_search_paths=[LOCALSTACK_BUILTIN_DATA_PATH], **kwargs)
1✔
57

58

59
class PatchingLoader(Loader):
1✔
60
    """
61
    A custom botocore Loader that applies JSON patches from the given json patch file to the specs as they are loaded.
62
    """
63

64
    patches: dict[str, list]
1✔
65

66
    def __init__(self, patches: dict[str, list], *args, **kwargs):
1✔
67
        # add the builtin data path to the extra_search_paths to ensure they are discovered by the loader
68
        super().__init__(*args, **kwargs)
1✔
69
        self.patches = patches
1✔
70

71
    @instance_cache
1✔
72
    def load_data(self, name: str):
1✔
73
        result = super().load_data(name)
1✔
74

75
        if patches := self.patches.get(name):
1✔
76
            return jsonpatch.apply_patch(result, patches)
1✔
77

78
        return result
1✔
79

80

81
class CustomLoader(PatchingLoader, LocalStackBuiltInDataLoaderMixin):
1✔
82
    # Class mixing the different loader features (patching, localstack specific data)
83
    pass
1✔
84

85

86
loader = CustomLoader(load_spec_patches())
1✔
87

88

89
class UnknownServiceProtocolError(UnknownServiceError):
1✔
90
    """Raised when trying to load a service with an unknown protocol.
91

92
    :ivar service_name: The name of the service.
93
    :ivar protocol: The name of the unknown protocol.
94
    """
95

96
    fmt = "Unknown service protocol: '{service_name}-{protocol}'."
1✔
97

98

99
def list_services() -> list[ServiceModel]:
1✔
100
    return [load_service(service) for service in loader.list_available_services("service-2")]
1✔
101

102

103
def load_service(
1✔
104
    service: ServiceName, version: str | None = None, protocol: ProtocolName | None = None
105
) -> ServiceModel:
106
    """
107
    Loads a service
108
    :param service: to load, f.e. "sqs". For custom, internalized, service protocol specs (f.e. sqs-query) it's also
109
                    possible to directly define the protocol in the service name (f.e. use sqs-query)
110
    :param version: of the service to load, f.e. "2012-11-05", by default the latest version will be used
111
    :param protocol: specific protocol to load for the specific service, f.e. "json" for the "sqs" service
112
                     if the service cannot be found
113
    :return: Loaded service model of the service
114
    :raises: UnknownServiceError if the service cannot be found
115
    :raises: UnknownServiceProtocolError if the specific protocol of the service cannot be found
116
    """
117
    service_description = loader.load_service_model(service, "service-2", version)
1✔
118
    service_metadata = service_description.get("metadata", {})
1✔
119
    service_protocols = {service_metadata.get("protocol")}
1✔
120
    if protocols := service_metadata.get("protocols"):
1✔
121
        service_protocols.update(protocols)
1✔
122

123
    # check if the protocol is defined, and if so, if the loaded service defines this protocol
124
    if protocol is not None and protocol not in service_protocols:
1✔
125
        # if the protocol is defined, but not the one of the currently loaded service,
126
        # check if we already loaded the custom spec based on the naming convention (<service>-<protocol>),
127
        # f.e. "sqs-query"
128
        if service.endswith(f"-{protocol}"):
1✔
129
            # if so, we raise an exception
130
            raise UnknownServiceProtocolError(service_name=service, protocol=protocol)
×
131
        # otherwise we try to load it (recursively)
132
        try:
1✔
133
            return load_service(f"{service}-{protocol}", version, protocol=protocol)
1✔
134
        except UnknownServiceError:
1✔
135
            # raise an unknown protocol error in case the service also can't be loaded with the naming convention
136
            raise UnknownServiceProtocolError(service_name=service, protocol=protocol)
1✔
137

138
    # remove potential protocol names from the service name
139
    # FIXME add more protocols here if we have to internalize more than just sqs-query
140
    # TODO this should not contain specific internalized service names
141
    service = {"sqs-query": "sqs"}.get(service, service)
1✔
142
    return ServiceModel(service_description, service)
1✔
143

144

145
def iterate_service_operations() -> Generator[tuple[ServiceModel, OperationModel], None, None]:
1✔
146
    """
147
    Returns one record per operation in the AWS service spec, where the first item is the service model the operation
148
    belongs to, and the second is the operation model.
149

150
    :return: an iterable
151
    """
152
    for service in list_services():
1✔
153
        for op_name in service.operation_names:
1✔
154
            yield service, service.operation_model(op_name)
1✔
155

156

157
def is_protocol_in_service_model_identifier(
1✔
158
    protocol: ProtocolName, service_model_identifier: ServiceModelIdentifier
159
) -> bool:
160
    """
161
    :param protocol: the protocol name to check
162
    :param service_model_identifier:
163
    :return: boolean to indicate if the protocol is available for that service
164
    """
165
    protocols = service_model_identifier.protocols or []
1✔
166
    return protocol in protocols or protocol == service_model_identifier.protocol
1✔
167

168

169
def get_service_model_identifier(service_model: ServiceModel) -> ServiceModelIdentifier:
1✔
170
    protocols = service_model.metadata.get("protocols")
1✔
171
    return ServiceModelIdentifier(
1✔
172
        name=service_model.service_name,
173
        protocol=service_model.protocol,
174
        protocols=tuple(protocols) if protocols else None,
175
    )
176

177

178
@dataclasses.dataclass
1✔
179
class ServiceCatalogIndex:
1✔
180
    """
181
    The ServiceCatalogIndex enables fast lookups for common operations to determine a service from service indicators.
182
    """
183

184
    service_names: list[ServiceName]
1✔
185
    target_prefix_index: dict[str, list[ServiceModelIdentifier]]
1✔
186
    signing_name_index: dict[str, list[ServiceModelIdentifier]]
1✔
187
    operations_index: dict[str, list[ServiceModelIdentifier]]
1✔
188
    endpoint_prefix_index: dict[str, list[ServiceModelIdentifier]]
1✔
189

190

191
class LazyServiceCatalogIndex:
1✔
192
    """
193
    A ServiceCatalogIndex that builds indexes in-memory from the spec.
194
    """
195

196
    @cached_property
1✔
197
    def service_names(self) -> list[ServiceName]:
1✔
198
        return list(self._services.keys())
1✔
199

200
    @cached_property
1✔
201
    def target_prefix_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
202
        result = defaultdict(list)
1✔
203
        for service_models in self._services.values():
1✔
204
            for service_model in service_models:
1✔
205
                target_prefix = service_model.metadata.get("targetPrefix")
1✔
206
                if target_prefix:
1✔
207
                    result[target_prefix].append(get_service_model_identifier(service_model))
1✔
208
        return dict(result)
1✔
209

210
    @cached_property
1✔
211
    def signing_name_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
212
        result = defaultdict(list)
1✔
213
        for service_models in self._services.values():
1✔
214
            for service_model in service_models:
1✔
215
                result[service_model.signing_name].append(
1✔
216
                    get_service_model_identifier(service_model)
217
                )
218
        return dict(result)
1✔
219

220
    @cached_property
1✔
221
    def operations_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
222
        result = defaultdict(list)
1✔
223
        for service_models in self._services.values():
1✔
224
            for service_model in service_models:
1✔
225
                operations = service_model.operation_names
1✔
226
                if operations:
1✔
227
                    for operation in operations:
1✔
228
                        result[operation].append(get_service_model_identifier(service_model))
1✔
229
        return dict(result)
1✔
230

231
    @cached_property
1✔
232
    def endpoint_prefix_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
233
        result = defaultdict(list)
1✔
234
        for service_models in self._services.values():
1✔
235
            for service_model in service_models:
1✔
236
                result[service_model.endpoint_prefix].append(
1✔
237
                    get_service_model_identifier(service_model)
238
                )
239
        return dict(result)
1✔
240

241
    @cached_property
1✔
242
    def _services(self) -> dict[ServiceName, list[ServiceModel]]:
1✔
243
        services = defaultdict(list)
1✔
244
        for service in list_services():
1✔
245
            services[service.service_name].append(service)
1✔
246
        return services
1✔
247

248

249
class ServiceCatalog:
1✔
250
    index: ServiceCatalogIndex
1✔
251

252
    def __init__(self, index: ServiceCatalogIndex = None):
1✔
253
        self.index = index or LazyServiceCatalogIndex()
1✔
254

255
    @lru_cache(maxsize=512)
1✔
256
    def get(self, name: ServiceName, protocol: ProtocolName | None = None) -> ServiceModel | None:
1✔
257
        return load_service(name, protocol=protocol)
1✔
258

259
    @property
1✔
260
    def service_names(self) -> list[ServiceName]:
1✔
261
        return self.index.service_names
1✔
262

263
    @property
1✔
264
    def target_prefix_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
265
        return self.index.target_prefix_index
1✔
266

267
    @property
1✔
268
    def signing_name_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
269
        return self.index.signing_name_index
1✔
270

271
    @property
1✔
272
    def operations_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
273
        return self.index.operations_index
1✔
274

275
    @property
1✔
276
    def endpoint_prefix_index(self) -> dict[str, list[ServiceModelIdentifier]]:
1✔
277
        return self.index.endpoint_prefix_index
1✔
278

279
    def by_target_prefix(self, target_prefix: str) -> list[ServiceModelIdentifier]:
1✔
280
        return self.target_prefix_index.get(target_prefix, [])
1✔
281

282
    def by_signing_name(self, signing_name: str) -> list[ServiceModelIdentifier]:
1✔
283
        return self.signing_name_index.get(signing_name, [])
1✔
284

285
    def by_operation(self, operation_name: str) -> list[ServiceModelIdentifier]:
1✔
286
        return self.operations_index.get(operation_name, [])
1✔
287

288

289
def build_service_index_cache(file_path: str) -> ServiceCatalogIndex:
1✔
290
    """
291
    Creates a new ServiceCatalogIndex and stores it into the given file_path.
292

293
    :param file_path: the path to store the file to
294
    :return: the created ServiceCatalogIndex
295
    """
296
    return save_service_index_cache(LazyServiceCatalogIndex(), file_path)
×
297

298

299
def load_service_index_cache(file: str) -> ServiceCatalogIndex:
1✔
300
    """
301
    Loads from the given file the stored ServiceCatalogIndex.
302

303
    :param file: the file to load from
304
    :return: the loaded ServiceCatalogIndex
305
    """
306
    import dill
1✔
307

308
    with open(file, "rb") as fd:
1✔
309
        return dill.load(fd)
1✔
310

311

312
def save_service_index_cache(index: LazyServiceCatalogIndex, file_path: str) -> ServiceCatalogIndex:
1✔
313
    """
314
    Creates from the given LazyServiceCatalogIndex a ``ServiceCatalogIndex`, stores its contents into the given file,
315
    and then returns the newly created index.
316

317
    :param index: the LazyServiceCatalogIndex to store the index from.
318
    :param file_path: the path to store the binary index cache file to
319
    :return: the created ServiceCatalogIndex
320
    """
321
    import dill
1✔
322

323
    cache = ServiceCatalogIndex(
1✔
324
        service_names=index.service_names,
325
        endpoint_prefix_index=index.endpoint_prefix_index,
326
        operations_index=index.operations_index,
327
        signing_name_index=index.signing_name_index,
328
        target_prefix_index=index.target_prefix_index,
329
    )
330
    with open(file_path, "wb") as fd:
1✔
331
        # use dill (instead of plain pickle) to avoid issues when serializing the pickle from __main__
332
        dill.dump(cache, fd)
1✔
333
    return cache
1✔
334

335

336
def _get_catalog_filename():
1✔
337
    ls_ver = VERSION.replace(".", "_")
1✔
338
    botocore_ver = botocore.__version__.replace(".", "_")
1✔
339
    return f"service-catalog-{ls_ver}-{botocore_ver}.dill"
1✔
340

341

342
@singleton_factory
1✔
343
def get_service_catalog() -> ServiceCatalog:
1✔
344
    """Loads the ServiceCatalog (which contains all the service specs), and potentially re-uses a cached index."""
345

346
    try:
1✔
347
        catalog_file_name = _get_catalog_filename()
1✔
348
        static_catalog_file = os.path.join(config.dirs.static_libs, catalog_file_name)
1✔
349

350
        # try to load or load/build/save the service catalog index from the static libs
351
        index = None
1✔
352
        if os.path.exists(static_catalog_file):
1✔
353
            # load the service catalog from the static libs dir / built at build time
354
            LOG.debug("loading service catalog index cache file %s", static_catalog_file)
1✔
355
            index = load_service_index_cache(static_catalog_file)
1✔
356
        elif os.path.isdir(config.dirs.cache):
1✔
357
            cache_catalog_file = os.path.join(config.dirs.cache, catalog_file_name)
×
358
            if os.path.exists(cache_catalog_file):
×
359
                LOG.debug("loading service catalog index cache file %s", cache_catalog_file)
×
360
                index = load_service_index_cache(cache_catalog_file)
×
361
            else:
362
                LOG.debug("building service catalog index cache file %s", cache_catalog_file)
×
363
                index = build_service_index_cache(cache_catalog_file)
×
364
        return ServiceCatalog(index)
1✔
365
    except Exception:
×
366
        LOG.error(
×
367
            "error while processing service catalog index cache, falling back to lazy-loaded index",
368
            exc_info=LOG.isEnabledFor(logging.DEBUG),
369
        )
370
        return ServiceCatalog()
×
371

372

373
def main():
1✔
374
    catalog_file_name = _get_catalog_filename()
×
375
    static_catalog_file = os.path.join(config.dirs.static_libs, catalog_file_name)
×
376

377
    if os.path.exists(static_catalog_file):
×
378
        LOG.error(
×
379
            "service catalog index cache file (%s) already there. aborting!", static_catalog_file
380
        )
381
        return 1
×
382

383
    # load the service catalog from the static libs dir / built at build time
384
    LOG.debug("building service catalog index cache file %s", static_catalog_file)
×
385
    build_service_index_cache(static_catalog_file)
×
386

387

388
if __name__ == "__main__":
389
    sys.exit(main())
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc