• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19844934392

01 Dec 2025 07:55PM UTC coverage: 86.945% (+0.1%) from 86.821%
19844934392

push

github

web-flow
Update ASF APIs, provider signatures, disable lambda patches (#13444)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>
Co-authored-by: Silvio Vasiljevic <silvio.vasiljevic@gmail.com>

69707 of 80174 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.69
/localstack-core/localstack/services/es/provider.py
1
from contextlib import contextmanager
1✔
2
from typing import cast
1✔
3

4
from botocore.exceptions import ClientError
1✔
5

6
from localstack.aws.api import RequestContext, handler
1✔
7
from localstack.aws.api.es import (
1✔
8
    ARN,
9
    AccessDeniedException,
10
    AdvancedOptions,
11
    AdvancedSecurityOptionsInput,
12
    AutoTuneOptionsInput,
13
    CognitoOptions,
14
    CompatibleElasticsearchVersionsList,
15
    CompatibleVersionsMap,
16
    ConflictException,
17
    CreateElasticsearchDomainResponse,
18
    DeleteElasticsearchDomainResponse,
19
    DescribeElasticsearchDomainConfigResponse,
20
    DescribeElasticsearchDomainResponse,
21
    DescribeElasticsearchDomainsResponse,
22
    DisabledOperationException,
23
    DomainEndpointOptions,
24
    DomainInfoList,
25
    DomainName,
26
    DomainNameList,
27
    EBSOptions,
28
    ElasticsearchClusterConfig,
29
    ElasticsearchClusterConfigStatus,
30
    ElasticsearchDomainConfig,
31
    ElasticsearchDomainStatus,
32
    ElasticsearchVersionStatus,
33
    ElasticsearchVersionString,
34
    EncryptionAtRestOptions,
35
    EngineType,
36
    EsApi,
37
    GetCompatibleElasticsearchVersionsResponse,
38
    InternalException,
39
    InvalidPaginationTokenException,
40
    InvalidTypeException,
41
    LimitExceededException,
42
    ListDomainNamesResponse,
43
    ListElasticsearchVersionsResponse,
44
    ListTagsResponse,
45
    LogPublishingOptions,
46
    MaxResults,
47
    NextToken,
48
    NodeToNodeEncryptionOptions,
49
    OptionStatus,
50
    PolicyDocument,
51
    ResourceAlreadyExistsException,
52
    ResourceNotFoundException,
53
    SnapshotOptions,
54
    StringList,
55
    TagList,
56
    UpdateElasticsearchDomainConfigRequest,
57
    UpdateElasticsearchDomainConfigResponse,
58
    ValidationException,
59
    VPCOptions,
60
)
61
from localstack.aws.api.es import BaseException as EsBaseException
1✔
62
from localstack.aws.api.opensearch import (
1✔
63
    ClusterConfig,
64
    CompatibleVersionsList,
65
    DomainConfig,
66
    DomainStatus,
67
    VersionString,
68
)
69
from localstack.aws.connect import connect_to
1✔
70
from localstack.services.opensearch.packages import ELASTICSEARCH_DEFAULT_VERSION
1✔
71
from localstack.state import StateVisitor
1✔
72

73

74
def _version_to_opensearch(
1✔
75
    version: ElasticsearchVersionString | None,
76
) -> VersionString | None:
77
    if version is not None:
1✔
78
        if version.startswith("OpenSearch_"):
1✔
79
            return version
×
80
        else:
81
            return f"Elasticsearch_{version}"
1✔
82

83

84
def _version_from_opensearch(
1✔
85
    version: VersionString | None,
86
) -> ElasticsearchVersionString | None:
87
    if version is not None:
1✔
88
        if version.startswith("Elasticsearch_"):
1✔
89
            return version.split("_")[1]
1✔
90
        else:
91
            return version
×
92

93

94
def _instancetype_to_opensearch(instance_type: str | None) -> str | None:
1✔
95
    if instance_type is not None:
1✔
96
        return instance_type.replace("elasticsearch", "search")
1✔
97

98

99
def _instancetype_from_opensearch(instance_type: str | None) -> str | None:
1✔
100
    if instance_type is not None:
1✔
101
        return instance_type.replace("search", "elasticsearch")
1✔
102

103

104
def _clusterconfig_from_opensearch(
1✔
105
    cluster_config: ClusterConfig | None,
106
) -> ElasticsearchClusterConfig | None:
107
    if cluster_config is not None:
1✔
108
        # Just take the whole typed dict and typecast it to our target type
109
        result = cast(ElasticsearchClusterConfig, cluster_config)
1✔
110

111
        # Adjust the instance type names
112
        result["InstanceType"] = _instancetype_from_opensearch(cluster_config.get("InstanceType"))
1✔
113
        result["DedicatedMasterType"] = _instancetype_from_opensearch(
1✔
114
            cluster_config.get("DedicatedMasterType")
115
        )
116
        result["WarmType"] = _instancetype_from_opensearch(cluster_config.get("WarmType"))
1✔
117
        return result
1✔
118

119

120
def _domainstatus_from_opensearch(
1✔
121
    domain_status: DomainStatus | None,
122
) -> ElasticsearchDomainStatus | None:
123
    if domain_status is not None:
1✔
124
        # Just take the whole typed dict and typecast it to our target type
125
        result = cast(ElasticsearchDomainStatus, domain_status)
1✔
126
        # Only specifically handle keys which are named differently or their values differ (version and clusterconfig)
127
        result["ElasticsearchVersion"] = _version_from_opensearch(
1✔
128
            domain_status.get("EngineVersion")
129
        )
130
        result["ElasticsearchClusterConfig"] = _clusterconfig_from_opensearch(
1✔
131
            domain_status.get("ClusterConfig")
132
        )
133
        result.pop("EngineVersion", None)
1✔
134
        result.pop("ClusterConfig", None)
1✔
135
        return result
1✔
136

137

138
def _clusterconfig_to_opensearch(
1✔
139
    elasticsearch_cluster_config: ElasticsearchClusterConfig | None,
140
) -> ClusterConfig | None:
141
    if elasticsearch_cluster_config is not None:
1✔
142
        result = cast(ClusterConfig, elasticsearch_cluster_config)
1✔
143
        if instance_type := result.get("InstanceType"):
1✔
144
            result["InstanceType"] = _instancetype_to_opensearch(instance_type)
1✔
145
        if dedicated_master_type := result.get("DedicatedMasterType"):
1✔
146
            result["DedicatedMasterType"] = _instancetype_to_opensearch(dedicated_master_type)
1✔
147
        if warm_type := result.get("WarmType"):
1✔
148
            result["WarmType"] = _instancetype_to_opensearch(warm_type)
1✔
149
        return result
1✔
150

151

152
def _domainconfig_from_opensearch(
1✔
153
    domain_config: DomainConfig | None,
154
) -> ElasticsearchDomainConfig | None:
155
    if domain_config is not None:
×
156
        result = cast(ElasticsearchDomainConfig, domain_config)
×
157
        engine_version = domain_config.get("EngineVersion", {})
×
158
        result["ElasticsearchVersion"] = ElasticsearchVersionStatus(
×
159
            Options=_version_from_opensearch(engine_version.get("Options")),
160
            Status=cast(OptionStatus, engine_version.get("Status")),
161
        )
162
        cluster_config = domain_config.get("ClusterConfig", {})
×
163
        result["ElasticsearchClusterConfig"] = ElasticsearchClusterConfigStatus(
×
164
            Options=_clusterconfig_from_opensearch(cluster_config.get("Options")),
165
            Status=cluster_config.get("Status"),
166
        )
167
        result.pop("EngineVersion", None)
×
168
        result.pop("ClusterConfig", None)
×
169
        return result
×
170

171

172
def _compatible_version_list_from_opensearch(
1✔
173
    compatible_version_list: CompatibleVersionsList | None,
174
) -> CompatibleElasticsearchVersionsList | None:
175
    if compatible_version_list is not None:
×
176
        return [
×
177
            CompatibleVersionsMap(
178
                SourceVersion=_version_from_opensearch(version_map["SourceVersion"]),
179
                TargetVersions=[
180
                    _version_from_opensearch(target_version)
181
                    for target_version in version_map["TargetVersions"]
182
                ],
183
            )
184
            for version_map in compatible_version_list
185
        ]
186

187

188
@contextmanager
1✔
189
def exception_mapper():
1✔
190
    """Maps an exception thrown by the OpenSearch client to an exception thrown by the ElasticSearch API."""
191
    try:
1✔
192
        yield
1✔
193
    except ClientError as err:
×
194
        exception_types = {
×
195
            "AccessDeniedException": AccessDeniedException,
196
            "BaseException": EsBaseException,
197
            "ConflictException": ConflictException,
198
            "DisabledOperationException": DisabledOperationException,
199
            "InternalException": InternalException,
200
            "InvalidPaginationTokenException": InvalidPaginationTokenException,
201
            "InvalidTypeException": InvalidTypeException,
202
            "LimitExceededException": LimitExceededException,
203
            "ResourceAlreadyExistsException": ResourceAlreadyExistsException,
204
            "ResourceNotFoundException": ResourceNotFoundException,
205
            "ValidationException": ValidationException,
206
        }
207
        mapped_exception_type = exception_types.get(err.response["Error"]["Code"], EsBaseException)
×
208
        raise mapped_exception_type(err.response["Error"]["Message"])
×
209

210

211
class EsProvider(EsApi):
1✔
212
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
213
        # ES state entirely depends on `opensearch`, and delegates its entire state to it
214
        # we do not need to manage state in ES
215
        pass
×
216

217
    def create_elasticsearch_domain(
1✔
218
        self,
219
        context: RequestContext,
220
        domain_name: DomainName,
221
        elasticsearch_version: ElasticsearchVersionString = None,
222
        elasticsearch_cluster_config: ElasticsearchClusterConfig = None,
223
        ebs_options: EBSOptions = None,
224
        access_policies: PolicyDocument = None,
225
        snapshot_options: SnapshotOptions = None,
226
        vpc_options: VPCOptions = None,
227
        cognito_options: CognitoOptions = None,
228
        encryption_at_rest_options: EncryptionAtRestOptions = None,
229
        node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,
230
        advanced_options: AdvancedOptions = None,
231
        log_publishing_options: LogPublishingOptions = None,
232
        domain_endpoint_options: DomainEndpointOptions = None,
233
        advanced_security_options: AdvancedSecurityOptionsInput = None,
234
        auto_tune_options: AutoTuneOptionsInput = None,
235
        tag_list: TagList = None,
236
        **kwargs,
237
    ) -> CreateElasticsearchDomainResponse:
238
        opensearch_client = connect_to(
1✔
239
            region_name=context.region, aws_access_key_id=context.account_id
240
        ).opensearch
241
        # If no version is given, we set our default elasticsearch version
242
        engine_version = (
1✔
243
            _version_to_opensearch(elasticsearch_version)
244
            if elasticsearch_version
245
            else ELASTICSEARCH_DEFAULT_VERSION
246
        )
247
        kwargs = {
1✔
248
            "DomainName": domain_name,
249
            "EngineVersion": engine_version,
250
            "ClusterConfig": _clusterconfig_to_opensearch(elasticsearch_cluster_config),
251
            "EBSOptions": ebs_options,
252
            "AccessPolicies": access_policies,
253
            "SnapshotOptions": snapshot_options,
254
            "VPCOptions": vpc_options,
255
            "CognitoOptions": cognito_options,
256
            "EncryptionAtRestOptions": encryption_at_rest_options,
257
            "NodeToNodeEncryptionOptions": node_to_node_encryption_options,
258
            "AdvancedOptions": advanced_options,
259
            "LogPublishingOptions": log_publishing_options,
260
            "DomainEndpointOptions": domain_endpoint_options,
261
            "AdvancedSecurityOptions": advanced_security_options,
262
            "AutoTuneOptions": auto_tune_options,
263
            "TagList": tag_list,
264
        }
265

266
        # Filter the kwargs to not set None values at all (boto doesn't like that)
267
        kwargs = {key: value for key, value in kwargs.items() if value is not None}
1✔
268

269
        with exception_mapper():
1✔
270
            domain_status = opensearch_client.create_domain(**kwargs)["DomainStatus"]
1✔
271

272
        status = _domainstatus_from_opensearch(domain_status)
1✔
273
        return CreateElasticsearchDomainResponse(DomainStatus=status)
1✔
274

275
    def delete_elasticsearch_domain(
1✔
276
        self, context: RequestContext, domain_name: DomainName, **kwargs
277
    ) -> DeleteElasticsearchDomainResponse:
278
        opensearch_client = connect_to(
1✔
279
            region_name=context.region, aws_access_key_id=context.account_id
280
        ).opensearch
281

282
        with exception_mapper():
1✔
283
            domain_status = opensearch_client.delete_domain(
1✔
284
                DomainName=domain_name,
285
            )["DomainStatus"]
286

287
        status = _domainstatus_from_opensearch(domain_status)
1✔
288
        return DeleteElasticsearchDomainResponse(DomainStatus=status)
1✔
289

290
    def describe_elasticsearch_domain(
1✔
291
        self, context: RequestContext, domain_name: DomainName, **kwargs
292
    ) -> DescribeElasticsearchDomainResponse:
293
        opensearch_client = connect_to(
1✔
294
            region_name=context.region, aws_access_key_id=context.account_id
295
        ).opensearch
296

297
        with exception_mapper():
1✔
298
            opensearch_status = opensearch_client.describe_domain(
1✔
299
                DomainName=domain_name,
300
            )["DomainStatus"]
301

302
        status = _domainstatus_from_opensearch(opensearch_status)
1✔
303
        return DescribeElasticsearchDomainResponse(DomainStatus=status)
1✔
304

305
    @handler("UpdateElasticsearchDomainConfig", expand=False)
1✔
306
    def update_elasticsearch_domain_config(
1✔
307
        self, context: RequestContext, payload: UpdateElasticsearchDomainConfigRequest
308
    ) -> UpdateElasticsearchDomainConfigResponse:
309
        opensearch_client = connect_to(
×
310
            region_name=context.region, aws_access_key_id=context.account_id
311
        ).opensearch
312

313
        payload: dict
314
        if "ElasticsearchClusterConfig" in payload:
×
315
            payload["ClusterConfig"] = payload["ElasticsearchClusterConfig"]
×
316
            payload["ClusterConfig"]["InstanceType"] = _instancetype_to_opensearch(
×
317
                payload["ClusterConfig"]["InstanceType"]
318
            )
319
            payload.pop("ElasticsearchClusterConfig")
×
320

321
        with exception_mapper():
×
322
            opensearch_config = opensearch_client.update_domain_config(**payload)["DomainConfig"]
×
323

324
        config = _domainconfig_from_opensearch(opensearch_config)
×
325
        return UpdateElasticsearchDomainConfigResponse(DomainConfig=config)
×
326

327
    def describe_elasticsearch_domains(
1✔
328
        self, context: RequestContext, domain_names: DomainNameList, **kwargs
329
    ) -> DescribeElasticsearchDomainsResponse:
330
        opensearch_client = connect_to(
×
331
            region_name=context.region, aws_access_key_id=context.account_id
332
        ).opensearch
333

334
        with exception_mapper():
×
335
            opensearch_status_list = opensearch_client.describe_domains(
×
336
                DomainNames=domain_names,
337
            )["DomainStatusList"]
338

339
        status_list = [_domainstatus_from_opensearch(s) for s in opensearch_status_list]
×
340
        return DescribeElasticsearchDomainsResponse(DomainStatusList=status_list)
×
341

342
    def list_domain_names(
1✔
343
        self, context: RequestContext, engine_type: EngineType = None, **kwargs
344
    ) -> ListDomainNamesResponse:
345
        opensearch_client = connect_to(
×
346
            region_name=context.region, aws_access_key_id=context.account_id
347
        ).opensearch
348
        # Only hand the EngineType param to boto if it's set
349
        kwargs = {}
×
350
        if engine_type:
×
351
            kwargs["EngineType"] = engine_type
×
352

353
        with exception_mapper():
×
354
            domain_names = opensearch_client.list_domain_names(**kwargs)["DomainNames"]
×
355

356
        return ListDomainNamesResponse(DomainNames=cast(DomainInfoList | None, domain_names))
×
357

358
    def list_elasticsearch_versions(
1✔
359
        self,
360
        context: RequestContext,
361
        max_results: MaxResults = None,
362
        next_token: NextToken = None,
363
        **kwargs,
364
    ) -> ListElasticsearchVersionsResponse:
365
        opensearch_client = connect_to(
×
366
            region_name=context.region, aws_access_key_id=context.account_id
367
        ).opensearch
368
        # Construct the arguments as kwargs to not set None values at all (boto doesn't like that)
369
        kwargs = {
×
370
            key: value
371
            for key, value in {"MaxResults": max_results, "NextToken": next_token}.items()
372
            if value is not None
373
        }
374
        with exception_mapper():
×
375
            versions = opensearch_client.list_versions(**kwargs)
×
376

377
        return ListElasticsearchVersionsResponse(
×
378
            ElasticsearchVersions=[
379
                _version_from_opensearch(version) for version in versions["Versions"]
380
            ],
381
            NextToken=versions.get(next_token),
382
        )
383

384
    def get_compatible_elasticsearch_versions(
1✔
385
        self, context: RequestContext, domain_name: DomainName = None, **kwargs
386
    ) -> GetCompatibleElasticsearchVersionsResponse:
387
        opensearch_client = connect_to(
×
388
            region_name=context.region, aws_access_key_id=context.account_id
389
        ).opensearch
390
        # Only hand the DomainName param to boto if it's set
391
        kwargs = {}
×
392
        if domain_name:
×
393
            kwargs["DomainName"] = domain_name
×
394

395
        with exception_mapper():
×
396
            compatible_versions_response = opensearch_client.get_compatible_versions(**kwargs)
×
397

398
        compatible_versions = compatible_versions_response.get("CompatibleVersions")
×
399
        return GetCompatibleElasticsearchVersionsResponse(
×
400
            CompatibleElasticsearchVersions=_compatible_version_list_from_opensearch(
401
                compatible_versions
402
            )
403
        )
404

405
    def describe_elasticsearch_domain_config(
1✔
406
        self, context: RequestContext, domain_name: DomainName, **kwargs
407
    ) -> DescribeElasticsearchDomainConfigResponse:
408
        opensearch_client = connect_to(
×
409
            region_name=context.region, aws_access_key_id=context.account_id
410
        ).opensearch
411

412
        with exception_mapper():
×
413
            domain_config = opensearch_client.describe_domain_config(DomainName=domain_name).get(
×
414
                "DomainConfig"
415
            )
416

417
        return DescribeElasticsearchDomainConfigResponse(
×
418
            DomainConfig=_domainconfig_from_opensearch(domain_config)
419
        )
420

421
    def add_tags(self, context: RequestContext, arn: ARN, tag_list: TagList, **kwargs) -> None:
1✔
422
        opensearch_client = connect_to(
1✔
423
            region_name=context.region, aws_access_key_id=context.account_id
424
        ).opensearch
425

426
        with exception_mapper():
1✔
427
            opensearch_client.add_tags(ARN=arn, TagList=tag_list)
1✔
428

429
    def list_tags(self, context: RequestContext, arn: ARN, **kwargs) -> ListTagsResponse:
1✔
430
        opensearch_client = connect_to(
1✔
431
            region_name=context.region, aws_access_key_id=context.account_id
432
        ).opensearch
433

434
        with exception_mapper():
1✔
435
            response = opensearch_client.list_tags(ARN=arn)
1✔
436

437
        return ListTagsResponse(TagList=response.get("TagList"))
1✔
438

439
    def remove_tags(
1✔
440
        self, context: RequestContext, arn: ARN, tag_keys: StringList, **kwargs
441
    ) -> None:
442
        opensearch_client = connect_to(
×
443
            region_name=context.region, aws_access_key_id=context.account_id
444
        ).opensearch
445

446
        with exception_mapper():
×
447
            opensearch_client.remove_tags(ARN=arn, TagKeys=tag_keys)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc