• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.06
/localstack-core/localstack/services/opensearch/provider.py
1
import logging
1✔
2
import os
1✔
3
import re
1✔
4
import threading
1✔
5
from copy import deepcopy
1✔
6
from datetime import UTC, datetime
1✔
7
from random import randint
1✔
8
from urllib.parse import urlparse
1✔
9

10
from localstack import config
1✔
11
from localstack.aws.api import RequestContext, handler
1✔
12
from localstack.aws.api.opensearch import (
1✔
13
    ARN,
14
    AccessPoliciesStatus,
15
    AdvancedOptionsStatus,
16
    AdvancedSecurityOptions,
17
    AdvancedSecurityOptionsStatus,
18
    AutoTuneDesiredState,
19
    AutoTuneOptions,
20
    AutoTuneOptionsOutput,
21
    AutoTuneOptionsStatus,
22
    AutoTuneState,
23
    AutoTuneStatus,
24
    ClusterConfig,
25
    ClusterConfigStatus,
26
    CognitoOptions,
27
    CognitoOptionsStatus,
28
    ColdStorageOptions,
29
    CompatibleVersionsMap,
30
    CreateDomainRequest,
31
    CreateDomainResponse,
32
    DeleteDomainResponse,
33
    DeploymentStatus,
34
    DescribeDomainConfigResponse,
35
    DescribeDomainResponse,
36
    DescribeDomainsResponse,
37
    DomainConfig,
38
    DomainEndpointOptions,
39
    DomainEndpointOptionsStatus,
40
    DomainInfo,
41
    DomainName,
42
    DomainNameList,
43
    DomainProcessingStatusType,
44
    DomainStatus,
45
    EBSOptions,
46
    EBSOptionsStatus,
47
    EncryptionAtRestOptions,
48
    EncryptionAtRestOptionsStatus,
49
    EngineType,
50
    GetCompatibleVersionsResponse,
51
    ListDomainNamesResponse,
52
    ListTagsResponse,
53
    ListVersionsResponse,
54
    LogPublishingOptionsStatus,
55
    MaxResults,
56
    NextToken,
57
    NodeToNodeEncryptionOptions,
58
    NodeToNodeEncryptionOptionsStatus,
59
    OpensearchApi,
60
    OpenSearchPartitionInstanceType,
61
    OptionState,
62
    OptionStatus,
63
    ResourceAlreadyExistsException,
64
    ResourceNotFoundException,
65
    RollbackOnDisable,
66
    ServiceSoftwareOptions,
67
    SnapshotOptions,
68
    SnapshotOptionsStatus,
69
    StringList,
70
    TagList,
71
    TLSSecurityPolicy,
72
    UpdateDomainConfigRequest,
73
    UpdateDomainConfigResponse,
74
    ValidationException,
75
    VersionStatus,
76
    VolumeType,
77
    VPCDerivedInfoStatus,
78
)
79
from localstack.services.opensearch import versions
1✔
80
from localstack.services.opensearch.cluster import SecurityOptions
1✔
81
from localstack.services.opensearch.cluster_manager import (
1✔
82
    ClusterManager,
83
    DomainKey,
84
    create_cluster_manager,
85
)
86
from localstack.services.opensearch.models import OpenSearchStore, opensearch_stores
1✔
87
from localstack.services.opensearch.packages import OPENSEARCH_DEFAULT_VERSION
1✔
88
from localstack.services.plugins import ServiceLifecycleHook
1✔
89
from localstack.state import AssetDirectory, StateVisitor
1✔
90
from localstack.utils.aws.arns import parse_arn
1✔
91
from localstack.utils.collections import PaginatedList, remove_none_values_from_dict
1✔
92
from localstack.utils.serving import Server
1✔
93
from localstack.utils.urls import localstack_host
1✔
94

95
LOG = logging.getLogger(__name__)
1✔
96

97
# The singleton for the ClusterManager instance.
98
# The singleton is implemented this way only to be able to overwrite its value during tests.
99
__CLUSTER_MANAGER = None
1✔
100

101
# mutex for modifying domains
102
_domain_mutex = threading.RLock()
1✔
103

104
DEFAULT_OPENSEARCH_CLUSTER_CONFIG = ClusterConfig(
1✔
105
    InstanceType=OpenSearchPartitionInstanceType.m3_medium_search,
106
    InstanceCount=1,
107
    DedicatedMasterEnabled=True,
108
    ZoneAwarenessEnabled=False,
109
    DedicatedMasterType=OpenSearchPartitionInstanceType.m3_medium_search,
110
    DedicatedMasterCount=1,
111
)
112

113
DEFAULT_OPENSEARCH_DOMAIN_ENDPOINT_OPTIONS = DomainEndpointOptions(
1✔
114
    EnforceHTTPS=False,
115
    TLSSecurityPolicy=TLSSecurityPolicy.Policy_Min_TLS_1_0_2019_07,
116
    CustomEndpointEnabled=False,
117
)
118

119
DEFAULT_AUTOTUNE_OPTIONS = AutoTuneOptionsOutput(
1✔
120
    State=AutoTuneState.ENABLED,
121
    UseOffPeakWindow=False,
122
)
123

124

125
def cluster_manager() -> ClusterManager:
1✔
126
    global __CLUSTER_MANAGER
127
    if __CLUSTER_MANAGER is None:
1✔
128
        __CLUSTER_MANAGER = create_cluster_manager()
1✔
129
    return __CLUSTER_MANAGER
1✔
130

131

132
def _run_cluster_startup_monitor(cluster: Server, domain_name: str, region: str):
1✔
133
    LOG.debug("running cluster startup monitor for cluster %s", cluster)
1✔
134

135
    # wait until the cluster is started
136
    # NOTE: does not work when DNS rebind protection is active for localhost.localstack.cloud
137
    is_up = cluster.wait_is_up()
1✔
138

139
    LOG.debug("cluster state polling for %s returned! status = %s", domain_name, is_up)
1✔
140
    with _domain_mutex:
1✔
141
        store = OpensearchProvider.get_store(cluster.account_id, cluster.region_name)
1✔
142
        status = store.opensearch_domains.get(domain_name)
1✔
143
        if status is not None:
1✔
144
            status["Processing"] = False
1✔
145
        status["DomainProcessingStatus"] = DomainProcessingStatusType.Active
1✔
146

147

148
def create_cluster(
1✔
149
    domain_key: DomainKey,
150
    engine_version: str,
151
    domain_endpoint_options: DomainEndpointOptions | None,
152
    security_options: SecurityOptions | None,
153
    preferred_port: int | None = None,
154
):
155
    """
156
    Uses the ClusterManager to create a new cluster for the given domain key. NOT thread safe, needs to be called
157
    around _domain_mutex.
158
    If the preferred_port is given, this port will be preferred (if OPENSEARCH_ENDPOINT_STRATEGY == "port").
159
    """
160
    store = opensearch_stores[domain_key.account][domain_key.region]
1✔
161

162
    manager = cluster_manager()
1✔
163
    engine_version = engine_version or OPENSEARCH_DEFAULT_VERSION
1✔
164
    cluster = manager.create(
1✔
165
        arn=domain_key.arn,
166
        version=engine_version,
167
        endpoint_options=domain_endpoint_options,
168
        security_options=security_options,
169
        preferred_port=preferred_port,
170
    )
171

172
    # FIXME: in AWS, the Endpoint is set once the cluster is running, not before (like here), but our tests and
173
    #  in particular cloudformation currently relies on the assumption that it is set when the domain is created.
174
    status = store.opensearch_domains[domain_key.domain_name]
1✔
175
    # Replacing only 0.0.0.0 here as usage of this bind address mostly means running in docker which is used locally
176
    # If another bind address is used we want to keep it in the endpoint as this is a conscious user decision to
177
    # access from another device on the network.
178
    status["Endpoint"] = cluster.url.split("://")[-1].replace("0.0.0.0", localstack_host().host)
1✔
179
    status["EngineVersion"] = engine_version
1✔
180
    status["DomainProcessingStatus"] = DomainProcessingStatusType.Creating
1✔
181

182
    if cluster.is_up():
1✔
183
        status["Processing"] = False
1✔
184
        status["DomainProcessingStatus"] = DomainProcessingStatusType.Active
1✔
185
    else:
186
        # run a background thread that will update all domains that use this cluster to set
187
        # the cluster state once it is started, or the CLUSTER_STARTUP_TIMEOUT is reached
188
        threading.Thread(
1✔
189
            target=_run_cluster_startup_monitor,
190
            args=(cluster, domain_key.domain_name, domain_key.region),
191
            daemon=True,
192
        ).start()
193

194

195
def _remove_cluster(domain_key: DomainKey):
1✔
196
    parsed_arn = parse_arn(domain_key.arn)
1✔
197
    store = OpensearchProvider.get_store(parsed_arn["account"], parsed_arn["region"])
1✔
198
    cluster_manager().remove(domain_key.arn)
1✔
199
    del store.opensearch_domains[domain_key.domain_name]
1✔
200

201

202
def get_domain_config(domain_key) -> DomainConfig:
1✔
203
    status = get_domain_status(domain_key)
1✔
204
    return _status_to_config(status)
1✔
205

206

207
def _status_to_config(status: DomainStatus) -> DomainConfig:
1✔
208
    cluster_cfg = status.get("ClusterConfig") or {}
1✔
209
    default_cfg = DEFAULT_OPENSEARCH_CLUSTER_CONFIG
1✔
210
    config_status = get_domain_config_status()
1✔
211
    autotune_options = status.get("AutoTuneOptions") or DEFAULT_AUTOTUNE_OPTIONS
1✔
212
    autotune_state = autotune_options.get("State") or AutoTuneState.ENABLED
1✔
213
    desired_state = (
1✔
214
        AutoTuneDesiredState.ENABLED
215
        if autotune_state == AutoTuneState.ENABLED
216
        else AutoTuneDesiredState.DISABLED
217
    )
218
    return DomainConfig(
1✔
219
        AccessPolicies=AccessPoliciesStatus(
220
            Options=status.get("AccessPolicies", ""),
221
            Status=config_status,
222
        ),
223
        AdvancedOptions=AdvancedOptionsStatus(
224
            Options={
225
                "override_main_response_version": "false",
226
                "rest.action.multi.allow_explicit_index": "true",
227
            },
228
            Status=config_status,
229
        ),
230
        EBSOptions=EBSOptionsStatus(
231
            Options=EBSOptions(
232
                EBSEnabled=True,
233
                VolumeSize=100,
234
                VolumeType=VolumeType.gp2,
235
            ),
236
            Status=config_status,
237
        ),
238
        ClusterConfig=ClusterConfigStatus(
239
            Options=ClusterConfig(
240
                DedicatedMasterCount=cluster_cfg.get(
241
                    "DedicatedMasterCount", default_cfg["DedicatedMasterCount"]
242
                ),
243
                DedicatedMasterEnabled=cluster_cfg.get(
244
                    "DedicatedMasterEnabled", default_cfg["DedicatedMasterEnabled"]
245
                ),
246
                DedicatedMasterType=cluster_cfg.get(
247
                    "DedicatedMasterType", default_cfg["DedicatedMasterType"]
248
                ),
249
                InstanceCount=cluster_cfg.get("InstanceCount", default_cfg["InstanceCount"]),
250
                InstanceType=cluster_cfg.get("InstanceType", default_cfg["InstanceType"]),
251
                ZoneAwarenessEnabled=cluster_cfg.get(
252
                    "ZoneAwarenessEnabled", default_cfg["ZoneAwarenessEnabled"]
253
                ),
254
            ),
255
            Status=config_status,
256
        ),
257
        CognitoOptions=CognitoOptionsStatus(
258
            Options=CognitoOptions(Enabled=False), Status=config_status
259
        ),
260
        EngineVersion=VersionStatus(Options=status.get("EngineVersion"), Status=config_status),
261
        EncryptionAtRestOptions=EncryptionAtRestOptionsStatus(
262
            Options=EncryptionAtRestOptions(Enabled=False),
263
            Status=config_status,
264
        ),
265
        LogPublishingOptions=LogPublishingOptionsStatus(
266
            Options={},
267
            Status=config_status,
268
        ),
269
        SnapshotOptions=SnapshotOptionsStatus(
270
            Options=SnapshotOptions(AutomatedSnapshotStartHour=randint(0, 23)),
271
            Status=config_status,
272
        ),
273
        VPCOptions=VPCDerivedInfoStatus(
274
            Options={},
275
            Status=config_status,
276
        ),
277
        DomainEndpointOptions=DomainEndpointOptionsStatus(
278
            Options=status.get("DomainEndpointOptions", {}),
279
            Status=config_status,
280
        ),
281
        NodeToNodeEncryptionOptions=NodeToNodeEncryptionOptionsStatus(
282
            Options=NodeToNodeEncryptionOptions(Enabled=False),
283
            Status=config_status,
284
        ),
285
        AdvancedSecurityOptions=AdvancedSecurityOptionsStatus(
286
            Options=status.get("AdvancedSecurityOptions", {}), Status=config_status
287
        ),
288
        AutoTuneOptions=AutoTuneOptionsStatus(
289
            Options=AutoTuneOptions(
290
                DesiredState=desired_state,
291
                RollbackOnDisable=RollbackOnDisable.NO_ROLLBACK,
292
                MaintenanceSchedules=[],
293
                UseOffPeakWindow=autotune_options.get("UseOffPeakWindow", False),
294
            ),
295
            Status=AutoTuneStatus(
296
                CreationDate=config_status.get("CreationDate"),
297
                UpdateDate=config_status.get("UpdateDate"),
298
                UpdateVersion=config_status.get("UpdateVersion"),
299
                State=autotune_state,
300
                PendingDeletion=config_status.get("PendingDeletion"),
301
            ),
302
        ),
303
    )
304

305

306
def get_domain_config_status() -> OptionStatus:
1✔
307
    return OptionStatus(
1✔
308
        CreationDate=datetime.now(),
309
        PendingDeletion=False,
310
        State=OptionState.Active,
311
        UpdateDate=datetime.now(),
312
        UpdateVersion=randint(1, 100),
313
    )
314

315

316
def get_domain_status(
1✔
317
    domain_key: DomainKey, deleted=False, request: CreateDomainRequest | None = None
318
) -> DomainStatus:
319
    parsed_arn = parse_arn(domain_key.arn)
1✔
320
    store = OpensearchProvider.get_store(parsed_arn["account"], parsed_arn["region"])
1✔
321
    stored_status: DomainStatus = (
1✔
322
        store.opensearch_domains.get(domain_key.domain_name) or DomainStatus()
323
    )
324
    cluster_cfg = stored_status.get("ClusterConfig") or {}
1✔
325
    default_cfg = DEFAULT_OPENSEARCH_CLUSTER_CONFIG
1✔
326
    if request:
1✔
327
        stored_status = deepcopy(stored_status)
1✔
328
        stored_status.update(request)
1✔
329
        default_cfg.update(request.get("ClusterConfig", {}))
1✔
330

331
    autotune_options = stored_status.get("AutoTuneOptions") or deepcopy(DEFAULT_AUTOTUNE_OPTIONS)
1✔
332
    if request and (request_options := request.get("AutoTuneOptions")):
1✔
333
        desired_state = request_options.get("DesiredState") or AutoTuneDesiredState.ENABLED
1✔
334
        state = (
1✔
335
            AutoTuneState.ENABLED
336
            if desired_state == AutoTuneDesiredState.ENABLED
337
            else AutoTuneState.DISABLED
338
        )
339
        autotune_options = AutoTuneOptionsOutput(
1✔
340
            State=state,
341
            UseOffPeakWindow=request_options.get(
342
                "UseOffPeakWindow", autotune_options.get("UseOffPeakWindow", False)
343
            ),
344
        )
345
    stored_status["AutoTuneOptions"] = autotune_options
1✔
346

347
    domain_processing_status = stored_status.get("DomainProcessingStatus", None)
1✔
348
    processing = stored_status.get("Processing", True)
1✔
349
    if deleted:
1✔
350
        domain_processing_status = DomainProcessingStatusType.Deleting
1✔
351
        processing = True
1✔
352

353
    new_status = DomainStatus(
1✔
354
        ARN=domain_key.arn,
355
        Created=True,
356
        Deleted=deleted,
357
        DomainProcessingStatus=domain_processing_status,
358
        Processing=processing,
359
        DomainId=f"{domain_key.account}/{domain_key.domain_name}",
360
        DomainName=domain_key.domain_name,
361
        ClusterConfig=ClusterConfig(
362
            DedicatedMasterCount=cluster_cfg.get(
363
                "DedicatedMasterCount", default_cfg["DedicatedMasterCount"]
364
            ),
365
            DedicatedMasterEnabled=cluster_cfg.get(
366
                "DedicatedMasterEnabled", default_cfg["DedicatedMasterEnabled"]
367
            ),
368
            DedicatedMasterType=cluster_cfg.get(
369
                "DedicatedMasterType", default_cfg["DedicatedMasterType"]
370
            ),
371
            InstanceCount=cluster_cfg.get("InstanceCount", default_cfg["InstanceCount"]),
372
            InstanceType=cluster_cfg.get("InstanceType", default_cfg["InstanceType"]),
373
            ZoneAwarenessEnabled=cluster_cfg.get(
374
                "ZoneAwarenessEnabled", default_cfg["ZoneAwarenessEnabled"]
375
            ),
376
            WarmEnabled=False,
377
            ColdStorageOptions=ColdStorageOptions(Enabled=False),
378
        ),
379
        EngineVersion=stored_status.get("EngineVersion") or OPENSEARCH_DEFAULT_VERSION,
380
        Endpoint=stored_status.get("Endpoint", None),
381
        EBSOptions=stored_status.get("EBSOptions")
382
        or EBSOptions(EBSEnabled=True, VolumeType=VolumeType.gp2, VolumeSize=10, Iops=0),
383
        CognitoOptions=CognitoOptions(Enabled=False),
384
        UpgradeProcessing=False,
385
        AccessPolicies=stored_status.get("AccessPolicies", ""),
386
        SnapshotOptions=SnapshotOptions(AutomatedSnapshotStartHour=0),
387
        EncryptionAtRestOptions=EncryptionAtRestOptions(Enabled=False),
388
        NodeToNodeEncryptionOptions=NodeToNodeEncryptionOptions(Enabled=False),
389
        AdvancedOptions={
390
            "override_main_response_version": "false",
391
            "rest.action.multi.allow_explicit_index": "true",
392
            **stored_status.get("AdvancedOptions", {}),
393
        },
394
        ServiceSoftwareOptions=ServiceSoftwareOptions(
395
            CurrentVersion="",
396
            NewVersion="",
397
            UpdateAvailable=False,
398
            Cancellable=False,
399
            UpdateStatus=DeploymentStatus.COMPLETED,
400
            Description="There is no software update available for this domain.",
401
            AutomatedUpdateDate=datetime.fromtimestamp(0, tz=UTC),
402
            OptionalDeployment=True,
403
        ),
404
        DomainEndpointOptions=stored_status.get("DomainEndpointOptions")
405
        or DEFAULT_OPENSEARCH_DOMAIN_ENDPOINT_OPTIONS,
406
        AdvancedSecurityOptions=AdvancedSecurityOptions(
407
            Enabled=False, InternalUserDatabaseEnabled=False
408
        ),
409
        AutoTuneOptions=AutoTuneOptionsOutput(
410
            State=stored_status.get("AutoTuneOptions", {}).get("State"),
411
            UseOffPeakWindow=autotune_options.get("UseOffPeakWindow", False),
412
        ),
413
    )
414
    return new_status
1✔
415

416

417
def _ensure_domain_exists(arn: ARN) -> None:
1✔
418
    """
419
    Checks if the domain for the given ARN exists. Otherwise, a ValidationException is raised.
420

421
    :param arn: ARN string to lookup the domain for
422
    :return: None if the domain exists, otherwise raises an exception
423
    :raises: ValidationException if the domain for the given ARN cannot be found
424
    """
425
    parsed_arn = parse_arn(arn)
1✔
426
    store = OpensearchProvider.get_store(parsed_arn["account"], parsed_arn["region"])
1✔
427
    domain_key = DomainKey.from_arn(arn)
1✔
428
    domain_status = store.opensearch_domains.get(domain_key.domain_name)
1✔
429
    if domain_status is None:
1✔
430
        raise ValidationException("Invalid ARN. Domain not found.")
×
431

432

433
def _update_domain_config_request_to_status(request: UpdateDomainConfigRequest) -> DomainStatus:
1✔
434
    request: dict
435
    request.pop("DryRun", None)
1✔
436
    request.pop("DomainName", None)
1✔
437
    return request
1✔
438

439

440
_domain_name_pattern = re.compile(r"[a-z][a-z0-9\\-]{3,28}")
1✔
441

442

443
def is_valid_domain_name(name: str) -> bool:
1✔
444
    return True if _domain_name_pattern.match(name) else False
1✔
445

446

447
def validate_endpoint_options(endpoint_options: DomainEndpointOptions):
1✔
448
    custom_endpoint = endpoint_options.get("CustomEndpoint", "")
1✔
449
    custom_endpoint_enabled = endpoint_options.get("CustomEndpointEnabled", False)
1✔
450

451
    if custom_endpoint and not custom_endpoint_enabled:
1✔
452
        raise ValidationException(
×
453
            "CustomEndpointEnabled flag should be set in order to use CustomEndpoint."
454
        )
455
    if custom_endpoint_enabled and not custom_endpoint:
1✔
456
        raise ValidationException(
×
457
            "Please provide CustomEndpoint field to create a custom endpoint."
458
        )
459

460

461
class OpensearchProvider(OpensearchApi, ServiceLifecycleHook):
1✔
462
    @staticmethod
1✔
463
    def get_store(account_id: str, region_name: str) -> OpenSearchStore:
1✔
464
        return opensearch_stores[account_id][region_name]
1✔
465

466
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
467
        visitor.visit(opensearch_stores)
×
468
        visitor.visit(AssetDirectory(self.service, os.path.join(config.dirs.data, "opensearch")))
×
469
        visitor.visit(AssetDirectory(self.service, os.path.join(config.dirs.data, "elasticsearch")))
×
470

471
    def on_after_state_load(self):
1✔
472
        """Starts clusters whose metadata has been restored."""
473
        for account_id, region, store in opensearch_stores.iter_stores():
×
474
            for domain_name, domain_status in store.opensearch_domains.items():
×
475
                domain_key = DomainKey(domain_name, region, account_id)
×
476
                if cluster_manager().get(domain_key.arn):
×
477
                    # cluster already restored in previous call to on_after_state_load
478
                    continue
×
479

480
                LOG.info("Restoring domain %s in region %s.", domain_name, region)
×
481
                try:
×
482
                    preferred_port = None
×
483
                    if config.OPENSEARCH_ENDPOINT_STRATEGY == "port":
×
484
                        # try to parse the previous port to re-use it for the re-created cluster
485
                        if "Endpoint" in domain_status:
×
486
                            preferred_port = urlparse(f"http://{domain_status['Endpoint']}").port
×
487

488
                    engine_version = domain_status.get("EngineVersion")
×
489
                    domain_endpoint_options = domain_status.get("DomainEndpointOptions", {})
×
490
                    security_options = SecurityOptions.from_input(
×
491
                        domain_status.get("AdvancedSecurityOptions")
492
                    )
493

494
                    create_cluster(
×
495
                        domain_key=domain_key,
496
                        engine_version=engine_version,
497
                        domain_endpoint_options=domain_endpoint_options,
498
                        security_options=security_options,
499
                        preferred_port=preferred_port,
500
                    )
501
                except Exception:
×
502
                    LOG.error(
×
503
                        "Could not restore domain %s in region %s.",
504
                        domain_name,
505
                        region,
506
                        exc_info=LOG.isEnabledFor(logging.DEBUG),
507
                    )
508

509
    def on_before_state_reset(self):
1✔
510
        self._stop_clusters()
×
511

512
    def on_before_stop(self):
1✔
513
        self._stop_clusters()
1✔
514

515
    def _stop_clusters(self):
1✔
516
        for account_id, region, store in opensearch_stores.iter_stores():
1✔
517
            for domain_name in store.opensearch_domains.keys():
1✔
518
                cluster_manager().remove(DomainKey(domain_name, region, account_id).arn)
×
519

520
    def _add_tags(self, context: RequestContext, arn: ARN, tag_list: TagList) -> None:
1✔
521
        self.get_store(context.account_id, context.region).tags.update_tags(
1✔
522
            arn, {tag["Key"]: tag["Value"] for tag in tag_list}
523
        )
524

525
    def _remove_tags(self, context: RequestContext, arn: ARN, tag_keys: StringList) -> None:
1✔
526
        self.get_store(context.account_id, context.region).tags.delete_tags(arn, tag_keys)
1✔
527

528
    def _remove_all_tags(self, context: RequestContext, arn: ARN) -> None:
1✔
529
        self.get_store(context.account_id, context.region).tags.delete_all_tags(arn)
1✔
530

531
    def _list_tags(self, context: RequestContext, arn: ARN) -> TagList:
1✔
532
        store = self.get_store(context.account_id, context.region)
1✔
533
        tags = store.tags.get_tags(arn)
1✔
534
        return [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
535

536
    @handler("CreateDomain", expand=False)
1✔
537
    def create_domain(
1✔
538
        self, context: RequestContext, request: CreateDomainRequest
539
    ) -> CreateDomainResponse:
540
        store = self.get_store(context.account_id, context.region)
1✔
541

542
        if not (domain_name := request.get("DomainName")) or not is_valid_domain_name(domain_name):
1✔
543
            # TODO: this should use the server-side validation framework at some point.
544
            raise ValidationException(
1✔
545
                "Member must satisfy regular expression pattern: [a-z][a-z0-9\\-]+"
546
            )
547

548
        if domain_endpoint_options := request.get("DomainEndpointOptions", {}):
1✔
549
            validate_endpoint_options(domain_endpoint_options)
1✔
550

551
        with _domain_mutex:
1✔
552
            if domain_name in store.opensearch_domains:
1✔
553
                raise ResourceAlreadyExistsException(
1✔
554
                    f"domain {domain_name} already exists in region {context.region}"
555
                )
556
            domain_key = DomainKey(
1✔
557
                domain_name=domain_name,
558
                region=context.region,
559
                account=context.account_id,
560
            )
561
            security_options = SecurityOptions.from_input(request.get("AdvancedSecurityOptions"))
1✔
562

563
            # "create" domain data
564
            store.opensearch_domains[domain_name] = get_domain_status(domain_key, request=request)
1✔
565
            if domain_endpoint_options:
1✔
566
                store.opensearch_domains[domain_name]["DomainEndpointOptions"] = (
1✔
567
                    DEFAULT_OPENSEARCH_DOMAIN_ENDPOINT_OPTIONS | domain_endpoint_options
568
                )
569

570
            # lazy-init the cluster (sets the Endpoint and Processing flag of the domain status)
571
            # TODO handle additional parameters (cluster config,...)
572
            create_cluster(
1✔
573
                domain_key, request.get("EngineVersion"), domain_endpoint_options, security_options
574
            )
575

576
            # set the tags
577
            if tags := request.get("TagList", []):
1✔
578
                self._add_tags(context, domain_key.arn, tags)
×
579

580
            # get the (updated) status
581
            status = get_domain_status(domain_key)
1✔
582

583
        return CreateDomainResponse(DomainStatus=status)
1✔
584

585
    def delete_domain(
1✔
586
        self, context: RequestContext, domain_name: DomainName, **kwargs
587
    ) -> DeleteDomainResponse:
588
        domain_key = DomainKey(
1✔
589
            domain_name=domain_name,
590
            region=context.region,
591
            account=context.account_id,
592
        )
593
        store = self.get_store(context.account_id, context.region)
1✔
594
        with _domain_mutex:
1✔
595
            if domain_name not in store.opensearch_domains:
1✔
596
                raise ResourceNotFoundException(f"Domain not found: {domain_name}")
1✔
597

598
            status = get_domain_status(domain_key, deleted=True)
1✔
599
            _remove_cluster(domain_key)
1✔
600
            self._remove_all_tags(context, domain_key.arn)
1✔
601

602
        return DeleteDomainResponse(DomainStatus=status)
1✔
603

604
    def describe_domain(
1✔
605
        self, context: RequestContext, domain_name: DomainName, **kwargs
606
    ) -> DescribeDomainResponse:
607
        store = self.get_store(context.account_id, context.region)
1✔
608
        domain_key = DomainKey(
1✔
609
            domain_name=domain_name,
610
            region=context.region,
611
            account=context.account_id,
612
        )
613
        with _domain_mutex:
1✔
614
            if domain_name not in store.opensearch_domains:
1✔
615
                raise ResourceNotFoundException(f"Domain not found: {domain_name}")
×
616

617
            status = get_domain_status(domain_key)
1✔
618
        return DescribeDomainResponse(DomainStatus=status)
1✔
619

620
    @handler("UpdateDomainConfig", expand=False)
1✔
621
    def update_domain_config(
1✔
622
        self, context: RequestContext, payload: UpdateDomainConfigRequest
623
    ) -> UpdateDomainConfigResponse:
624
        domain_key = DomainKey(
1✔
625
            domain_name=payload["DomainName"],
626
            region=context.region,
627
            account=context.account_id,
628
        )
629
        store = self.get_store(context.account_id, context.region)
1✔
630
        with _domain_mutex:
1✔
631
            domain_status = store.opensearch_domains.get(domain_key.domain_name, None)
1✔
632
            if domain_status is None:
1✔
633
                raise ResourceNotFoundException(f"Domain not found: {domain_key.domain_name}")
×
634

635
            if payload.get("AutoTuneOptions"):
1✔
636
                auto_request = payload.pop("AutoTuneOptions")
1✔
637
                desired_state = auto_request.get("DesiredState") or AutoTuneDesiredState.ENABLED
1✔
638

639
                state = (
1✔
640
                    AutoTuneState.ENABLED
641
                    if desired_state == AutoTuneDesiredState.ENABLED
642
                    else AutoTuneState.DISABLED
643
                )
644

645
                current_autotune = domain_status.get("AutoTuneOptions", {})
1✔
646
                domain_status["AutoTuneOptions"] = AutoTuneOptionsOutput(
1✔
647
                    State=state,
648
                    UseOffPeakWindow=auto_request.get(
649
                        "UseOffPeakWindow", current_autotune.get("UseOffPeakWindow", False)
650
                    ),
651
                )
652

653
            status_update: dict = _update_domain_config_request_to_status(payload)
1✔
654
            domain_status.update(status_update)
1✔
655

656
        return UpdateDomainConfigResponse(DomainConfig=_status_to_config(domain_status))
1✔
657

658
    def describe_domains(
1✔
659
        self, context: RequestContext, domain_names: DomainNameList, **kwargs
660
    ) -> DescribeDomainsResponse:
661
        status_list = []
1✔
662
        with _domain_mutex:
1✔
663
            for domain_name in domain_names:
1✔
664
                try:
1✔
665
                    domain_status = self.describe_domain(context, domain_name)["DomainStatus"]
1✔
666
                    status_list.append(domain_status)
1✔
667
                except ResourceNotFoundException:
×
668
                    # ResourceNotFoundExceptions are ignored, we just look for the next domain.
669
                    # If no domain can be found, the result will just be empty.
670
                    pass
×
671
        return DescribeDomainsResponse(DomainStatusList=status_list)
1✔
672

673
    def list_domain_names(
1✔
674
        self, context: RequestContext, engine_type: EngineType = None, **kwargs
675
    ) -> ListDomainNamesResponse:
676
        store = self.get_store(context.account_id, context.region)
1✔
677
        domain_names = [
1✔
678
            DomainInfo(
679
                DomainName=DomainName(domain_name),
680
                EngineType=versions.get_engine_type(domain["EngineVersion"]),
681
            )
682
            for domain_name, domain in store.opensearch_domains.items()
683
            if engine_type is None
684
            or versions.get_engine_type(domain["EngineVersion"]) == engine_type
685
        ]
686
        return ListDomainNamesResponse(DomainNames=domain_names)
1✔
687

688
    def list_versions(
1✔
689
        self,
690
        context: RequestContext,
691
        max_results: MaxResults = None,
692
        next_token: NextToken = None,
693
        **kwargs,
694
    ) -> ListVersionsResponse:
695
        version_list = PaginatedList(versions.install_versions.keys())
1✔
696
        page, nxt = version_list.get_page(
1✔
697
            lambda x: x,
698
            next_token=next_token,
699
            page_size=max_results,
700
        )
701
        response = ListVersionsResponse(Versions=page, NextToken=nxt)
1✔
702
        return remove_none_values_from_dict(response)
1✔
703

704
    def get_compatible_versions(
1✔
705
        self, context: RequestContext, domain_name: DomainName = None, **kwargs
706
    ) -> GetCompatibleVersionsResponse:
707
        version_filter = None
1✔
708
        if domain_name:
1✔
709
            store = self.get_store(context.account_id, context.region)
1✔
710
            with _domain_mutex:
1✔
711
                domain = store.opensearch_domains.get(domain_name)
1✔
712
                if not domain:
1✔
713
                    raise ResourceNotFoundException(f"Domain not found: {domain_name}")
×
714
                version_filter = domain.get("EngineVersion")
1✔
715
        compatible_versions = list(versions.compatible_versions)
1✔
716
        if version_filter is not None:
1✔
717
            compatible_versions = [
1✔
718
                comp
719
                for comp in versions.compatible_versions
720
                if comp["SourceVersion"] == version_filter
721
            ]
722
            if not compatible_versions:
1✔
723
                compatible_versions = [
1✔
724
                    CompatibleVersionsMap(SourceVersion=version_filter, TargetVersions=[])
725
                ]
726
        return GetCompatibleVersionsResponse(CompatibleVersions=compatible_versions)
1✔
727

728
    def describe_domain_config(
1✔
729
        self, context: RequestContext, domain_name: DomainName, **kwargs
730
    ) -> DescribeDomainConfigResponse:
731
        domain_key = DomainKey(
1✔
732
            domain_name=domain_name,
733
            region=context.region,
734
            account=context.account_id,
735
        )
736
        store = self.get_store(context.account_id, context.region)
1✔
737
        with _domain_mutex:
1✔
738
            if domain_name not in store.opensearch_domains:
1✔
739
                raise ResourceNotFoundException(f"Domain not found: {domain_name}")
×
740
            domain_config = get_domain_config(domain_key)
1✔
741
        return DescribeDomainConfigResponse(DomainConfig=domain_config)
1✔
742

743
    def add_tags(self, context: RequestContext, arn: ARN, tag_list: TagList, **kwargs) -> None:
1✔
744
        _ensure_domain_exists(arn)
1✔
745
        self._add_tags(context, arn, tag_list)
1✔
746

747
    def list_tags(self, context: RequestContext, arn: ARN, **kwargs) -> ListTagsResponse:
1✔
748
        _ensure_domain_exists(arn)
1✔
749
        tag_list = self._list_tags(context, arn)
1✔
750
        return ListTagsResponse(TagList=tag_list)
1✔
751

752
    def remove_tags(
1✔
753
        self, context: RequestContext, arn: ARN, tag_keys: StringList, **kwargs
754
    ) -> None:
755
        _ensure_domain_exists(arn)
1✔
756
        self._remove_tags(context, arn, tag_keys)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc