• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19880423371

02 Dec 2025 08:25PM UTC coverage: 86.905% (-0.04%) from 86.945%
19880423371

push

github

web-flow
fix/external client CA bundle (#13451)

1 of 5 new or added lines in 1 file covered. (20.0%)

414 existing lines in 19 files now uncovered.

69738 of 80246 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.72
/localstack-core/localstack/services/route53resolver/provider.py
1
from datetime import UTC, datetime
1✔
2

3
from moto.route53resolver.models import Route53ResolverBackend as MotoRoute53ResolverBackend
1✔
4
from moto.route53resolver.models import route53resolver_backends
1✔
5

6
import localstack.services.route53resolver.utils
1✔
7
from localstack.aws.api import RequestContext
1✔
8
from localstack.aws.api.route53resolver import (
1✔
9
    Action,
10
    AssociateFirewallRuleGroupResponse,
11
    AssociateResolverQueryLogConfigResponse,
12
    BlockOverrideDnsType,
13
    BlockOverrideDomain,
14
    BlockOverrideTtl,
15
    BlockResponse,
16
    ConfidenceThreshold,
17
    CreateFirewallDomainListResponse,
18
    CreateFirewallRuleGroupResponse,
19
    CreateFirewallRuleResponse,
20
    CreateResolverEndpointResponse,
21
    CreateResolverQueryLogConfigResponse,
22
    CreatorRequestId,
23
    DeleteFirewallDomainListResponse,
24
    DeleteFirewallRuleGroupResponse,
25
    DeleteFirewallRuleResponse,
26
    DeleteResolverQueryLogConfigResponse,
27
    DestinationArn,
28
    DisassociateFirewallRuleGroupResponse,
29
    DisassociateResolverQueryLogConfigResponse,
30
    DnsThreatProtection,
31
    Filters,
32
    FirewallConfig,
33
    FirewallDomainList,
34
    FirewallDomainListMetadata,
35
    FirewallDomainName,
36
    FirewallDomainRedirectionAction,
37
    FirewallDomains,
38
    FirewallDomainUpdateOperation,
39
    FirewallFailOpenStatus,
40
    FirewallRule,
41
    FirewallRuleGroup,
42
    FirewallRuleGroupAssociation,
43
    FirewallRuleGroupMetadata,
44
    GetFirewallConfigResponse,
45
    GetFirewallDomainListResponse,
46
    GetFirewallRuleGroupAssociationResponse,
47
    GetFirewallRuleGroupResponse,
48
    GetResolverQueryLogConfigAssociationResponse,
49
    GetResolverQueryLogConfigResponse,
50
    InvalidParameterException,
51
    InvalidRequestException,
52
    IpAddressesRequest,
53
    ListDomainMaxResults,
54
    ListFirewallConfigsMaxResult,
55
    ListFirewallConfigsResponse,
56
    ListFirewallDomainListsResponse,
57
    ListFirewallDomainsResponse,
58
    ListFirewallRuleGroupsResponse,
59
    ListFirewallRulesResponse,
60
    ListResolverQueryLogConfigAssociationsResponse,
61
    ListResolverQueryLogConfigsResponse,
62
    MaxResults,
63
    MutationProtectionStatus,
64
    Name,
65
    NextToken,
66
    OutpostArn,
67
    OutpostInstanceType,
68
    Priority,
69
    ProtocolList,
70
    Qtype,
71
    ResolverEndpointDirection,
72
    ResolverEndpointType,
73
    ResolverQueryLogConfig,
74
    ResolverQueryLogConfigAssociation,
75
    ResolverQueryLogConfigName,
76
    ResolverQueryLogConfigStatus,
77
    ResourceId,
78
    ResourceNotFoundException,
79
    Route53ResolverApi,
80
    SecurityGroupIds,
81
    SortByKey,
82
    SortOrder,
83
    TagList,
84
    UpdateFirewallConfigResponse,
85
    UpdateFirewallDomainsResponse,
86
    UpdateFirewallRuleGroupAssociationResponse,
87
    UpdateFirewallRuleResponse,
88
    ValidationException,
89
)
90
from localstack.services.ec2.models import get_ec2_backend
1✔
91
from localstack.services.moto import call_moto
1✔
92
from localstack.services.route53resolver.models import Route53ResolverStore, route53resolver_stores
1✔
93
from localstack.services.route53resolver.utils import (
1✔
94
    get_resolver_query_log_config_id,
95
    get_route53_resolver_firewall_domain_list_id,
96
    get_route53_resolver_firewall_rule_group_association_id,
97
    get_route53_resolver_firewall_rule_group_id,
98
    get_route53_resolver_query_log_config_association_id,
99
    validate_destination_arn,
100
    validate_mutation_protection,
101
    validate_priority,
102
)
103
from localstack.state import StateVisitor
1✔
104
from localstack.utils.aws import arns
1✔
105
from localstack.utils.aws.arns import extract_account_id_from_arn, extract_region_from_arn
1✔
106
from localstack.utils.collections import select_from_typed_dict
1✔
107
from localstack.utils.patch import patch
1✔
108

109

110
class Route53ResolverProvider(Route53ResolverApi):
1✔
111
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
112
        visitor.visit(route53resolver_backends)
×
UNCOV
113
        visitor.visit(route53resolver_stores)
×
114

115
    @staticmethod
1✔
116
    def get_store(account_id: str, region: str) -> Route53ResolverStore:
1✔
117
        return route53resolver_stores[account_id][region]
1✔
118

119
    def create_firewall_rule_group(
1✔
120
        self,
121
        context: RequestContext,
122
        creator_request_id: CreatorRequestId,
123
        name: Name,
124
        tags: TagList = None,
125
        **kwargs,
126
    ) -> CreateFirewallRuleGroupResponse:
127
        """Create a Firewall Rule Group."""
128
        store = self.get_store(context.account_id, context.region)
1✔
129
        firewall_rule_group_id = get_route53_resolver_firewall_rule_group_id()
1✔
130
        arn = arns.route53_resolver_firewall_rule_group_arn(
1✔
131
            firewall_rule_group_id, context.account_id, context.region
132
        )
133
        firewall_rule_group = FirewallRuleGroup(
1✔
134
            Id=firewall_rule_group_id,
135
            Arn=arn,
136
            Name=name,
137
            RuleCount=0,
138
            Status="COMPLETE",
139
            OwnerId=context.account_id,
140
            ShareStatus="NOT_SHARED",
141
            StatusMessage="Created Firewall Rule Group",
142
            CreatorRequestId=creator_request_id,
143
            CreationTime=datetime.now(UTC).isoformat(),
144
            ModificationTime=datetime.now(UTC).isoformat(),
145
        )
146
        store.firewall_rule_groups[firewall_rule_group_id] = firewall_rule_group
1✔
147
        store.firewall_rules[firewall_rule_group_id] = {}
1✔
148
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
1✔
149
            arn, tags or []
150
        )
151
        return CreateFirewallRuleGroupResponse(FirewallRuleGroup=firewall_rule_group)
1✔
152

153
    def delete_firewall_rule_group(
1✔
154
        self, context: RequestContext, firewall_rule_group_id: ResourceId, **kwargs
155
    ) -> DeleteFirewallRuleGroupResponse:
156
        """Delete a Firewall Rule Group."""
157
        store = self.get_store(context.account_id, context.region)
1✔
158
        firewall_rule_group: FirewallRuleGroup = store.delete_firewall_rule_group(
1✔
159
            firewall_rule_group_id
160
        )
161
        return DeleteFirewallRuleGroupResponse(FirewallRuleGroup=firewall_rule_group)
1✔
162

163
    def get_firewall_rule_group(
1✔
164
        self, context: RequestContext, firewall_rule_group_id: ResourceId, **kwargs
165
    ) -> GetFirewallRuleGroupResponse:
166
        """Get the details of a Firewall Rule Group."""
UNCOV
167
        store = self.get_store(context.account_id, context.region)
×
UNCOV
168
        firewall_rule_group: FirewallRuleGroup = store.get_firewall_rule_group(
×
169
            firewall_rule_group_id
170
        )
UNCOV
171
        return GetFirewallRuleGroupResponse(FirewallRuleGroup=firewall_rule_group)
×
172

173
    def list_firewall_rule_groups(
1✔
174
        self,
175
        context: RequestContext,
176
        max_results: MaxResults = None,
177
        next_token: NextToken = None,
178
        **kwargs,
179
    ) -> ListFirewallRuleGroupsResponse:
180
        """List Firewall Rule Groups."""
UNCOV
181
        store = self.get_store(context.account_id, context.region)
×
182
        firewall_rule_groups = []
×
UNCOV
183
        for firewall_rule_group in store.firewall_rule_groups.values():
×
UNCOV
184
            firewall_rule_groups.append(
×
185
                select_from_typed_dict(FirewallRuleGroupMetadata, firewall_rule_group)
186
            )
UNCOV
187
        return ListFirewallRuleGroupsResponse(FirewallRuleGroups=firewall_rule_groups)
×
188

189
    def create_firewall_domain_list(
1✔
190
        self,
191
        context: RequestContext,
192
        creator_request_id: CreatorRequestId,
193
        name: Name,
194
        tags: TagList = None,
195
        **kwargs,
196
    ) -> CreateFirewallDomainListResponse:
197
        """Create a Firewall Domain List."""
198
        store = self.get_store(context.account_id, context.region)
1✔
199
        id = get_route53_resolver_firewall_domain_list_id()
1✔
200
        arn = arns.route53_resolver_firewall_domain_list_arn(id, context.account_id, context.region)
1✔
201
        firewall_domain_list = FirewallDomainList(
1✔
202
            Id=id,
203
            Arn=arn,
204
            Name=name,
205
            DomainCount=0,
206
            Status="COMPLETE",
207
            StatusMessage="Created Firewall Domain List",
208
            ManagedOwnerName=context.account_id,
209
            CreatorRequestId=creator_request_id,
210
            CreationTime=datetime.now(UTC).isoformat(),
211
            ModificationTime=datetime.now(UTC).isoformat(),
212
        )
213
        store.firewall_domain_lists[id] = firewall_domain_list
1✔
214
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
1✔
215
            arn, tags or []
216
        )
217
        return CreateFirewallDomainListResponse(FirewallDomainList=firewall_domain_list)
1✔
218

219
    def delete_firewall_domain_list(
1✔
220
        self, context: RequestContext, firewall_domain_list_id: ResourceId, **kwargs
221
    ) -> DeleteFirewallDomainListResponse:
222
        """Delete a Firewall Domain List."""
223
        store = self.get_store(context.account_id, context.region)
1✔
224
        firewall_domain_list: FirewallDomainList = store.delete_firewall_domain_list(
1✔
225
            firewall_domain_list_id
226
        )
227
        return DeleteFirewallDomainListResponse(FirewallDomainList=firewall_domain_list)
1✔
228

229
    def get_firewall_domain_list(
1✔
230
        self, context: RequestContext, firewall_domain_list_id: ResourceId, **kwargs
231
    ) -> GetFirewallDomainListResponse:
232
        """Get the details of a Firewall Domain List."""
UNCOV
233
        store = self.get_store(context.account_id, context.region)
×
UNCOV
234
        firewall_domain_list: FirewallDomainList = store.get_firewall_domain_list(
×
235
            firewall_domain_list_id
236
        )
UNCOV
237
        return GetFirewallDomainListResponse(FirewallDomainList=firewall_domain_list)
×
238

239
    def list_firewall_domain_lists(
1✔
240
        self,
241
        context: RequestContext,
242
        max_results: MaxResults = None,
243
        next_token: NextToken = None,
244
        **kwargs,
245
    ) -> ListFirewallDomainListsResponse:
246
        """List all Firewall Domain Lists."""
247
        store = self.get_store(context.account_id, context.region)
1✔
248
        firewall_domain_lists = []
1✔
249
        for firewall_domain_list in store.firewall_domain_lists.values():
1✔
250
            firewall_domain_lists.append(
1✔
251
                select_from_typed_dict(FirewallDomainListMetadata, firewall_domain_list)
252
            )
253
        return ListFirewallDomainListsResponse(FirewallDomainLists=firewall_domain_lists)
1✔
254

255
    def update_firewall_domains(
1✔
256
        self,
257
        context: RequestContext,
258
        firewall_domain_list_id: ResourceId,
259
        operation: FirewallDomainUpdateOperation,
260
        domains: FirewallDomains,
261
        **kwargs,
262
    ) -> UpdateFirewallDomainsResponse:
263
        """Update the domains in a Firewall Domain List."""
264
        store = self.get_store(context.account_id, context.region)
×
265

266
        firewall_domain_list: FirewallDomainList = store.get_firewall_domain_list(
×
267
            firewall_domain_list_id
268
        )
UNCOV
269
        firewall_domains = store.get_firewall_domain(firewall_domain_list_id)
×
270

UNCOV
271
        if operation == FirewallDomainUpdateOperation.ADD:
×
272
            if not firewall_domains:
×
273
                store.firewall_domains[firewall_domain_list_id] = domains
×
274
            else:
275
                store.firewall_domains[firewall_domain_list_id].append(domains)
×
276

UNCOV
277
        if operation == FirewallDomainUpdateOperation.REMOVE:
×
278
            if firewall_domains:
×
UNCOV
279
                for domain in domains:
×
UNCOV
280
                    if domain in firewall_domains:
×
UNCOV
281
                        firewall_domains.remove(domain)
×
282
                    else:
283
                        raise ValidationException(
×
284
                            f"[RSLVR-02502] The following domains don't exist in the DNS Firewall domain list '{firewall_domain_list_id}'. You can't delete a domain that isn't in a domain list. Example unknown domain: '{domain}'. Trace Id: '{localstack.services.route53resolver.utils.get_trace_id()}'"
285
                        )
286

287
        if operation == FirewallDomainUpdateOperation.REPLACE:
×
UNCOV
288
            store.firewall_domains[firewall_domain_list_id] = domains
×
289

UNCOV
290
        firewall_domain_list["StatusMessage"] = "Finished domain list update"
×
UNCOV
291
        firewall_domain_list["ModificationTime"] = datetime.now(UTC).isoformat()
×
UNCOV
292
        return UpdateFirewallDomainsResponse(
×
293
            Id=firewall_domain_list.get("Id"),
294
            Name=firewall_domain_list.get("Name"),
295
            Status=firewall_domain_list.get("Status"),
296
            StatusMessage=firewall_domain_list.get("StatusMessage"),
297
        )
298

299
    def list_firewall_domains(
1✔
300
        self,
301
        context: RequestContext,
302
        firewall_domain_list_id: ResourceId,
303
        max_results: ListDomainMaxResults = None,
304
        next_token: NextToken = None,
305
        **kwargs,
306
    ) -> ListFirewallDomainsResponse:
307
        """List the domains in a DNS Firewall domain list."""
308
        store = self.get_store(context.account_id, context.region)
×
UNCOV
309
        firewall_domains: FirewallDomains[FirewallDomainName] = []
×
UNCOV
310
        if store.firewall_domains.get(firewall_domain_list_id):
×
UNCOV
311
            for firewall_domain in store.firewall_domains.get(firewall_domain_list_id):
×
UNCOV
312
                firewall_domains.append(FirewallDomainName(firewall_domain))
×
UNCOV
313
        return ListFirewallDomainsResponse(Domains=firewall_domains)
×
314

315
    def create_firewall_rule(
1✔
316
        self,
317
        context: RequestContext,
318
        creator_request_id: CreatorRequestId,
319
        firewall_rule_group_id: ResourceId,
320
        priority: Priority,
321
        action: Action,
322
        name: Name,
323
        firewall_domain_list_id: ResourceId = None,
324
        block_response: BlockResponse = None,
325
        block_override_domain: BlockOverrideDomain = None,
326
        block_override_dns_type: BlockOverrideDnsType = None,
327
        block_override_ttl: BlockOverrideTtl = None,
328
        firewall_domain_redirection_action: FirewallDomainRedirectionAction = None,
329
        qtype: Qtype = None,
330
        dns_threat_protection: DnsThreatProtection = None,
331
        confidence_threshold: ConfidenceThreshold = None,
332
        **kwargs,
333
    ) -> CreateFirewallRuleResponse:
334
        """Create a new firewall rule"""
335
        # TODO add support for firewall_domain_list_id, dns_threat_protection, and confidence_threshold
336
        store = self.get_store(context.account_id, context.region)
1✔
337
        firewall_rule = FirewallRule(
1✔
338
            FirewallRuleGroupId=firewall_rule_group_id,
339
            FirewallDomainListId=firewall_domain_list_id,
340
            Name=name,
341
            Priority=priority,
342
            Action=action,
343
            BlockResponse=block_response,
344
            BlockOverrideDomain=block_override_domain,
345
            BlockOverrideDnsType=block_override_dns_type,
346
            BlockOverrideTtl=block_override_ttl,
347
            CreatorRequestId=creator_request_id,
348
            CreationTime=datetime.now(UTC).isoformat(),
349
            ModificationTime=datetime.now(UTC).isoformat(),
350
            FirewallDomainRedirectionAction=firewall_domain_redirection_action,
351
            Qtype=qtype,
352
        )
353
        if firewall_rule_group_id in store.firewall_rules:
1✔
354
            store.firewall_rules[firewall_rule_group_id][firewall_domain_list_id] = firewall_rule
1✔
355
        # TODO: handle missing firewall-rule-group-id
356
        return CreateFirewallRuleResponse(FirewallRule=firewall_rule)
1✔
357

358
    def delete_firewall_rule(
1✔
359
        self,
360
        context: RequestContext,
361
        firewall_rule_group_id: ResourceId,
362
        firewall_domain_list_id: ResourceId = None,
363
        firewall_threat_protection_id: ResourceId = None,
364
        qtype: Qtype = None,
365
        **kwargs,
366
    ) -> DeleteFirewallRuleResponse:
367
        """Delete a firewall rule"""
368
        store = self.get_store(context.account_id, context.region)
1✔
369
        firewall_rule: FirewallRule = store.delete_firewall_rule(
1✔
370
            firewall_rule_group_id, firewall_domain_list_id
371
        )
372
        return DeleteFirewallRuleResponse(
1✔
373
            FirewallRule=firewall_rule,
374
        )
375

376
    def list_firewall_rules(
1✔
377
        self,
378
        context: RequestContext,
379
        firewall_rule_group_id: ResourceId,
380
        priority: Priority = None,
381
        action: Action = None,
382
        max_results: MaxResults = None,
383
        next_token: NextToken = None,
384
        **kwargs,
385
    ) -> ListFirewallRulesResponse:
386
        """List firewall rules in a firewall rule group.
387

388
        Rules will be filtered by priority and action if values for these params are provided.
389

390
        Raises:
391
            ResourceNotFound: If a firewall group by the provided id does not exist.
392
        """
393
        store = self.get_store(context.account_id, context.region)
1✔
394
        firewall_rule_group = store.firewall_rules.get(firewall_rule_group_id)
1✔
395
        if firewall_rule_group is None:
1✔
396
            raise ResourceNotFoundException(
1✔
397
                f"Can't find the resource with ID '{firewall_rule_group_id}'. Trace Id: '{localstack.services.route53resolver.utils.get_trace_id()}'"
398
            )
399

400
        firewall_rules = [
1✔
401
            FirewallRule(rule)
402
            for rule in firewall_rule_group.values()
403
            if (action is None or action == rule["Action"])
404
            and (priority is None or priority == rule["Priority"])
405
        ]
406

407
        # TODO: implement max_results filtering and next_token handling
408
        return ListFirewallRulesResponse(FirewallRules=firewall_rules)
1✔
409

410
    def update_firewall_rule(
1✔
411
        self,
412
        context: RequestContext,
413
        firewall_rule_group_id: ResourceId,
414
        firewall_domain_list_id: ResourceId = None,
415
        firewall_threat_protection_id: ResourceId = None,
416
        priority: Priority = None,
417
        action: Action = None,
418
        block_response: BlockResponse = None,
419
        block_override_domain: BlockOverrideDomain = None,
420
        block_override_dns_type: BlockOverrideDnsType = None,
421
        block_override_ttl: BlockOverrideTtl = None,
422
        name: Name = None,
423
        firewall_domain_redirection_action: FirewallDomainRedirectionAction = None,
424
        qtype: Qtype = None,
425
        dns_threat_protection: DnsThreatProtection = None,
426
        confidence_threshold: ConfidenceThreshold = None,
427
        **kwargs,
428
    ) -> UpdateFirewallRuleResponse:
429
        """Updates a firewall rule"""
430
        store = self.get_store(context.account_id, context.region)
×
431
        firewall_rule: FirewallRule = store.get_firewall_rule(
×
432
            firewall_rule_group_id, firewall_domain_list_id
433
        )
434

435
        if priority:
×
436
            firewall_rule["Priority"] = priority
×
437
        if action:
×
438
            firewall_rule["Action"] = action
×
439
        if block_response:
×
440
            firewall_rule["BlockResponse"] = block_response
×
441
        if block_override_domain:
×
442
            firewall_rule["BlockOverrideDomain"] = block_override_domain
×
443
        if block_override_dns_type:
×
444
            firewall_rule["BlockOverrideDnsType"] = block_override_dns_type
×
445
        if block_override_ttl:
×
446
            firewall_rule["BlockOverrideTtl"] = block_override_ttl
×
447
        if name:
×
448
            firewall_rule["Name"] = name
×
UNCOV
449
        if firewall_domain_redirection_action:
×
UNCOV
450
            firewall_rule["FirewallDomainRedirectionAction"] = firewall_domain_redirection_action
×
UNCOV
451
        if qtype:
×
UNCOV
452
            firewall_rule["Qtype"] = qtype
×
UNCOV
453
        return UpdateFirewallRuleResponse(
×
454
            FirewallRule=firewall_rule,
455
        )
456

457
    def associate_firewall_rule_group(
1✔
458
        self,
459
        context: RequestContext,
460
        creator_request_id: CreatorRequestId,
461
        firewall_rule_group_id: ResourceId,
462
        vpc_id: ResourceId,
463
        priority: Priority,
464
        name: Name,
465
        mutation_protection: MutationProtectionStatus = None,
466
        tags: TagList = None,
467
        **kwargs,
468
    ) -> AssociateFirewallRuleGroupResponse:
469
        """Associate a firewall rule group with a VPC."""
470
        store = self.get_store(context.account_id, context.region)
×
UNCOV
471
        validate_priority(priority=priority)
×
UNCOV
472
        validate_mutation_protection(mutation_protection=mutation_protection)
×
473

UNCOV
474
        for firewall_rule_group_association in store.firewall_rule_group_associations.values():
×
475
            if (
×
476
                firewall_rule_group_association.get("VpcId") == vpc_id
477
                and firewall_rule_group_association.get("FirewallRuleGroupId")
478
                == firewall_rule_group_id
479
            ):
480
                raise ValidationException(
×
481
                    f"[RSLVR-02302] This DNS Firewall rule group can't be associated to a VPC: '{vpc_id}'. It is already associated to VPC '{firewall_rule_group_id}'. Try again with another VPC or DNS Firewall rule group. Trace Id: '{localstack.services.route53resolver.utils.get_trace_id()}'"
482
                )
483

484
        id = get_route53_resolver_firewall_rule_group_association_id()
×
UNCOV
485
        arn = arns.route53_resolver_firewall_rule_group_associations_arn(
×
486
            id, context.account_id, context.region
487
        )
488

UNCOV
489
        firewall_rule_group_association = FirewallRuleGroupAssociation(
×
490
            Id=id,
491
            Arn=arn,
492
            FirewallRuleGroupId=firewall_rule_group_id,
493
            VpcId=vpc_id,
494
            Name=name,
495
            Priority=priority,
496
            MutationProtection=mutation_protection or "DISABLED",
497
            Status="COMPLETE",
498
            StatusMessage="Creating Firewall Rule Group Association",
499
            CreatorRequestId=creator_request_id,
500
            CreationTime=datetime.now(UTC).isoformat(),
501
            ModificationTime=datetime.now(UTC).isoformat(),
502
        )
UNCOV
503
        store.firewall_rule_group_associations[id] = firewall_rule_group_association
×
UNCOV
504
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
×
505
            arn, tags or []
506
        )
UNCOV
507
        return AssociateFirewallRuleGroupResponse(
×
508
            FirewallRuleGroupAssociation=firewall_rule_group_association
509
        )
510

511
    def disassociate_firewall_rule_group(
1✔
512
        self, context: RequestContext, firewall_rule_group_association_id: ResourceId, **kwargs
513
    ) -> DisassociateFirewallRuleGroupResponse:
514
        """Disassociate a DNS Firewall rule group from a VPC."""
UNCOV
515
        store = self.get_store(context.account_id, context.region)
×
UNCOV
516
        firewall_rule_group_association: FirewallRuleGroupAssociation = (
×
517
            store.delete_firewall_rule_group_association(firewall_rule_group_association_id)
518
        )
UNCOV
519
        return DisassociateFirewallRuleGroupResponse(
×
520
            FirewallRuleGroupAssociation=firewall_rule_group_association
521
        )
522

523
    def get_firewall_rule_group_association(
1✔
524
        self, context: RequestContext, firewall_rule_group_association_id: ResourceId, **kwargs
525
    ) -> GetFirewallRuleGroupAssociationResponse:
526
        """Returns the Firewall Rule Group Association that you specified."""
UNCOV
527
        store = self.get_store(context.account_id, context.region)
×
UNCOV
528
        firewall_rule_group_association: FirewallRuleGroupAssociation = (
×
529
            store.get_firewall_rule_group_association(firewall_rule_group_association_id)
530
        )
UNCOV
531
        return GetFirewallRuleGroupAssociationResponse(
×
532
            FirewallRuleGroupAssociation=firewall_rule_group_association
533
        )
534

535
    def update_firewall_rule_group_association(
1✔
536
        self,
537
        context: RequestContext,
538
        firewall_rule_group_association_id: ResourceId,
539
        priority: Priority = None,
540
        mutation_protection: MutationProtectionStatus = None,
541
        name: Name = None,
542
        **kwargs,
543
    ) -> UpdateFirewallRuleGroupAssociationResponse:
544
        """Updates the specified Firewall Rule Group Association."""
UNCOV
545
        store = self.get_store(context.account_id, context.region)
×
UNCOV
546
        validate_priority(priority=priority)
×
UNCOV
547
        validate_mutation_protection(mutation_protection=mutation_protection)
×
548

549
        firewall_rule_group_association: FirewallRuleGroupAssociation = (
×
550
            store.get_firewall_rule_group_association(firewall_rule_group_association_id)
551
        )
552

553
        if priority:
×
UNCOV
554
            firewall_rule_group_association["Priority"] = priority
×
555
        if mutation_protection:
×
UNCOV
556
            firewall_rule_group_association["MutationProtection"] = mutation_protection
×
UNCOV
557
        if name:
×
UNCOV
558
            firewall_rule_group_association["Name"] = name
×
559

UNCOV
560
        return UpdateFirewallRuleGroupAssociationResponse(
×
561
            FirewallRuleGroupAssociation=firewall_rule_group_association
562
        )
563

564
    def create_resolver_query_log_config(
1✔
565
        self,
566
        context: RequestContext,
567
        name: ResolverQueryLogConfigName,
568
        destination_arn: DestinationArn,
569
        creator_request_id: CreatorRequestId,
570
        tags: TagList = None,
571
        **kwargs,
572
    ) -> CreateResolverQueryLogConfigResponse:
573
        store = self.get_store(context.account_id, context.region)
1✔
574
        validate_destination_arn(destination_arn)
1✔
575
        id = get_resolver_query_log_config_id()
1✔
576
        arn = arns.route53_resolver_query_log_config_arn(id, context.account_id, context.region)
1✔
577
        resolver_query_log_config: ResolverQueryLogConfig = ResolverQueryLogConfig(
1✔
578
            Id=id,
579
            Arn=arn,
580
            Name=name,
581
            AssociationCount=0,
582
            Status="CREATED",
583
            OwnerId=context.account_id,
584
            ShareStatus="NOT_SHARED",
585
            DestinationArn=destination_arn,
586
            CreatorRequestId=creator_request_id,
587
            CreationTime=datetime.now(UTC).isoformat(),
588
        )
589
        store.resolver_query_log_configs[id] = resolver_query_log_config
1✔
590
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
1✔
591
            arn, tags or []
592
        )
593
        return CreateResolverQueryLogConfigResponse(
1✔
594
            ResolverQueryLogConfig=resolver_query_log_config
595
        )
596

597
    def create_resolver_endpoint(
1✔
598
        self,
599
        context: RequestContext,
600
        creator_request_id: CreatorRequestId,
601
        security_group_ids: SecurityGroupIds,
602
        direction: ResolverEndpointDirection,
603
        ip_addresses: IpAddressesRequest,
604
        name: Name = None,
605
        outpost_arn: OutpostArn = None,
606
        preferred_instance_type: OutpostInstanceType = None,
607
        tags: TagList = None,
608
        resolver_endpoint_type: ResolverEndpointType = None,
609
        protocols: ProtocolList = None,
610
        **kwargs,
611
    ) -> CreateResolverEndpointResponse:
612
        create_resolver_endpoint_resp = call_moto(context)
1✔
613
        create_resolver_endpoint_resp["ResolverEndpoint"]["Status"] = (
1✔
614
            ResolverQueryLogConfigStatus.CREATING
615
        )
616
        return create_resolver_endpoint_resp
1✔
617

618
    def get_resolver_query_log_config(
1✔
619
        self, context: RequestContext, resolver_query_log_config_id: ResourceId, **kwargs
620
    ) -> GetResolverQueryLogConfigResponse:
UNCOV
621
        store = self.get_store(context.account_id, context.region)
×
UNCOV
622
        resolver_query_log_config: ResolverQueryLogConfig = store.get_resolver_query_log_config(
×
623
            resolver_query_log_config_id
624
        )
UNCOV
625
        return GetResolverQueryLogConfigResponse(ResolverQueryLogConfig=resolver_query_log_config)
×
626

627
    def delete_resolver_query_log_config(
1✔
628
        self, context: RequestContext, resolver_query_log_config_id: ResourceId, **kwargs
629
    ) -> DeleteResolverQueryLogConfigResponse:
630
        store = self.get_store(context.account_id, context.region)
1✔
631
        resolver_query_log_config: ResolverQueryLogConfig = store.delete_resolver_query_log_config(
1✔
632
            resolver_query_log_config_id
633
        )
634
        return DeleteResolverQueryLogConfigResponse(
1✔
635
            ResolverQueryLogConfig=resolver_query_log_config
636
        )
637

638
    def list_resolver_query_log_configs(
1✔
639
        self,
640
        context: RequestContext,
641
        max_results: MaxResults = None,
642
        next_token: NextToken = None,
643
        filters: Filters = None,
644
        sort_by: SortByKey = None,
645
        sort_order: SortOrder = None,
646
        **kwargs,
647
    ) -> ListResolverQueryLogConfigsResponse:
648
        store = self.get_store(context.account_id, context.region)
1✔
649
        resolver_query_log_configs = []
1✔
650
        for resolver_query_log_config in store.resolver_query_log_configs.values():
1✔
651
            resolver_query_log_configs.append(ResolverQueryLogConfig(resolver_query_log_config))
1✔
652
        return ListResolverQueryLogConfigsResponse(
1✔
653
            ResolverQueryLogConfigs=resolver_query_log_configs,
654
            TotalCount=len(resolver_query_log_configs),
655
        )
656

657
    def associate_resolver_query_log_config(
1✔
658
        self,
659
        context: RequestContext,
660
        resolver_query_log_config_id: ResourceId,
661
        resource_id: ResourceId,
662
        **kwargs,
663
    ) -> AssociateResolverQueryLogConfigResponse:
UNCOV
664
        store = self.get_store(context.account_id, context.region)
×
UNCOV
665
        id = get_route53_resolver_query_log_config_association_id()
×
666

UNCOV
667
        resolver_query_log_config_association: ResolverQueryLogConfigAssociation = (
×
668
            ResolverQueryLogConfigAssociation(
669
                Id=id,
670
                ResolverQueryLogConfigId=resolver_query_log_config_id,
671
                ResourceId=resource_id,
672
                Status="ACTIVE",
673
                Error="NONE",
674
                ErrorMessage="",
675
                CreationTime=datetime.now(UTC).isoformat(),
676
            )
677
        )
678

UNCOV
679
        store.resolver_query_log_config_associations[id] = resolver_query_log_config_association
×
680

UNCOV
681
        return AssociateResolverQueryLogConfigResponse(
×
682
            ResolverQueryLogConfigAssociation=resolver_query_log_config_association
683
        )
684

685
    def disassociate_resolver_query_log_config(
1✔
686
        self,
687
        context: RequestContext,
688
        resolver_query_log_config_id: ResourceId,
689
        resource_id: ResourceId,
690
        **kwargs,
691
    ) -> DisassociateResolverQueryLogConfigResponse:
692
        store = self.get_store(context.account_id, context.region)
×
UNCOV
693
        resolver_query_log_config_association = store.delete_resolver_query_log_config_associations(
×
694
            resolver_query_log_config_id, resource_id
695
        )
696

UNCOV
697
        return DisassociateResolverQueryLogConfigResponse(
×
698
            ResolverQueryLogConfigAssociation=resolver_query_log_config_association
699
        )
700

701
    def get_resolver_query_log_config_association(
1✔
702
        self,
703
        context: RequestContext,
704
        resolver_query_log_config_association_id: ResourceId,
705
        **kwargs,
706
    ) -> GetResolverQueryLogConfigAssociationResponse:
UNCOV
707
        store = self.get_store(context.account_id, context.region)
×
708
        resolver_query_log_config_association: ResolverQueryLogConfigAssociation = (
×
709
            store.get_resolver_query_log_config_associations(
710
                resolver_query_log_config_association_id
711
            )
712
        )
UNCOV
713
        return GetResolverQueryLogConfigAssociationResponse(
×
714
            ResolverQueryLogConfigAssociation=resolver_query_log_config_association
715
        )
716

717
    def list_resolver_query_log_config_associations(
1✔
718
        self,
719
        context: RequestContext,
720
        max_results: MaxResults = None,
721
        next_token: NextToken = None,
722
        filters: Filters = None,
723
        sort_by: SortByKey = None,
724
        sort_order: SortOrder = None,
725
        **kwargs,
726
    ) -> ListResolverQueryLogConfigAssociationsResponse:
727
        store = self.get_store(context.account_id, context.region)
×
UNCOV
728
        resolver_query_log_config_associations = []
×
UNCOV
729
        for (
×
730
            resolver_query_log_config_association
731
        ) in store.resolver_query_log_config_associations.values():
UNCOV
732
            resolver_query_log_config_associations.append(
×
733
                ResolverQueryLogConfigAssociation(resolver_query_log_config_association)
734
            )
UNCOV
735
        return ListResolverQueryLogConfigAssociationsResponse(
×
736
            TotalCount=len(resolver_query_log_config_associations),
737
            ResolverQueryLogConfigAssociations=resolver_query_log_config_associations,
738
        )
739

740
    def get_firewall_config(
1✔
741
        self, context: RequestContext, resource_id: ResourceId, **kwargs
742
    ) -> GetFirewallConfigResponse:
UNCOV
743
        store = self.get_store(context.account_id, context.region)
×
UNCOV
744
        firewall_config = store.get_or_create_firewall_config(
×
745
            resource_id, context.region, context.account_id
746
        )
UNCOV
747
        return GetFirewallConfigResponse(FirewallConfig=firewall_config)
×
748

749
    def list_firewall_configs(
1✔
750
        self,
751
        context: RequestContext,
752
        max_results: ListFirewallConfigsMaxResult = None,
753
        next_token: NextToken = None,
754
        **kwargs,
755
    ) -> ListFirewallConfigsResponse:
756
        store = self.get_store(context.account_id, context.region)
×
757
        firewall_configs = []
×
758
        backend = get_ec2_backend(context.account_id, context.region)
×
759
        for vpc in backend.vpcs:
×
UNCOV
760
            if vpc not in store.firewall_configs:
×
UNCOV
761
                store.get_or_create_firewall_config(vpc, context.region, context.account_id)
×
UNCOV
762
        for firewall_config in store.firewall_configs.values():
×
UNCOV
763
            firewall_configs.append(select_from_typed_dict(FirewallConfig, firewall_config))
×
UNCOV
764
        return ListFirewallConfigsResponse(FirewallConfigs=firewall_configs)
×
765

766
    def update_firewall_config(
1✔
767
        self,
768
        context: RequestContext,
769
        resource_id: ResourceId,
770
        firewall_fail_open: FirewallFailOpenStatus,
771
        **kwargs,
772
    ) -> UpdateFirewallConfigResponse:
UNCOV
773
        store = self.get_store(context.account_id, context.region)
×
UNCOV
774
        backend = get_ec2_backend(context.account_id, context.region)
×
775
        for resource_id in backend.vpcs:
×
UNCOV
776
            if resource_id not in store.firewall_configs:
×
777
                firewall_config = store.get_or_create_firewall_config(
×
778
                    resource_id, context.region, context.account_id
779
                )
UNCOV
780
                firewall_config["FirewallFailOpen"] = firewall_fail_open
×
781
            else:
UNCOV
782
                firewall_config = store.firewall_configs[resource_id]
×
UNCOV
783
                firewall_config["FirewallFailOpen"] = firewall_fail_open
×
UNCOV
784
        return UpdateFirewallConfigResponse(FirewallConfig=firewall_config)
×
785

786

787
@patch(MotoRoute53ResolverBackend._matched_arn)
1✔
788
def Route53ResolverBackend_matched_arn(fn, self, resource_arn):
1✔
789
    """Given ARN, raise exception if there is no corresponding resource."""
790
    account_id = extract_account_id_from_arn(resource_arn)
1✔
791
    region_name = extract_region_from_arn(resource_arn)
1✔
792
    store = Route53ResolverProvider.get_store(account_id, region_name)
1✔
793

794
    for firewall_rule_group in store.firewall_rule_groups.values():
1✔
795
        if firewall_rule_group.get("Arn") == resource_arn:
×
796
            return
×
797
    for firewall_domain_list in store.firewall_domain_lists.values():
1✔
798
        if firewall_domain_list.get("Arn") == resource_arn:
1✔
799
            return
1✔
800
    for firewall_rule_group_association in store.firewall_rule_group_associations.values():
×
801
        if firewall_rule_group_association.get("Arn") == resource_arn:
×
UNCOV
802
            return
×
UNCOV
803
    for resolver_query_log_config in store.resolver_query_log_configs.values():
×
UNCOV
804
        if resolver_query_log_config.get("Arn") == resource_arn:
×
UNCOV
805
            return
×
UNCOV
806
    fn(self, resource_arn)
×
807

808

809
@patch(MotoRoute53ResolverBackend.disassociate_resolver_rule)
1✔
810
def moto_disassociate_resolver_rule(fn, self, resolver_rule_id, vpc_id):
1✔
811
    if resolver_rule_id not in self.resolver_rules:
1✔
812
        raise ResourceNotFoundException(
1✔
813
            f'[RSLVR-00703] Resolver rule with ID "{resolver_rule_id}" does not exist.'
814
        )
815
    return fn(self, resolver_rule_id, vpc_id)
1✔
816

817

818
@patch(MotoRoute53ResolverBackend.create_resolver_endpoint)
1✔
819
def moto_create_resolver_endpoint(fn, self, *args, **kwargs):
1✔
820
    for group_id in kwargs.get("security_group_ids"):
1✔
821
        if not group_id.startswith("sg-"):
1✔
822
            raise InvalidParameterException(
1✔
823
                f'[RSLVR-00408] Malformed security group ID: "Invalid id: "{group_id}" '
824
                f'(expecting "sg-...")".'
825
            )
826
    return fn(self, *args, **kwargs)
1✔
827

828

829
@patch(MotoRoute53ResolverBackend.delete_resolver_rule)
1✔
830
def moto_delete_resolver_endpoint(fn, self, resolver_rule_id):
1✔
831
    if resolver_rule_id not in self.resolver_rules:
1✔
832
        raise ResourceNotFoundException(
1✔
833
            f'[RSLVR-00703] Resolver rule with ID "{resolver_rule_id}" does not exist.'
834
        )
835
    return fn(self, resolver_rule_id)
1✔
836

837

838
@patch(MotoRoute53ResolverBackend.create_resolver_rule)
1✔
839
def moto_create_resolver_rule(fn, self, *args, **kwargs):
1✔
840
    direction = [
1✔
841
        x.direction
842
        for x in self.resolver_endpoints.values()
843
        if x.id == kwargs.get("resolver_endpoint_id")
844
    ]
845
    if direction and direction[0] == ResolverEndpointDirection.INBOUND:
1✔
846
        raise InvalidRequestException(
1✔
847
            "[RSLVR-00700] Resolver rules can only be associated to OUTBOUND resolver endpoints."
848
        )
849
    return fn(self, *args, **kwargs)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc