• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20449761985

22 Dec 2025 09:22PM UTC coverage: 86.912% (-0.008%) from 86.92%
20449761985

push

github

web-flow
APIGW: improve store typing (#13552)

9 of 9 new or added lines in 1 file covered. (100.0%)

130 existing lines in 7 files now uncovered.

70016 of 80560 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.72
/localstack-core/localstack/services/route53resolver/provider.py
1
from datetime import UTC, datetime
1✔
2

3
from moto.route53resolver.models import Route53ResolverBackend as MotoRoute53ResolverBackend
1✔
4
from moto.route53resolver.models import route53resolver_backends
1✔
5

6
import localstack.services.route53resolver.utils
1✔
7
from localstack.aws.api import RequestContext
1✔
8
from localstack.aws.api.route53resolver import (
1✔
9
    Action,
10
    AssociateFirewallRuleGroupResponse,
11
    AssociateResolverQueryLogConfigResponse,
12
    BlockOverrideDnsType,
13
    BlockOverrideDomain,
14
    BlockOverrideTtl,
15
    BlockResponse,
16
    ConfidenceThreshold,
17
    CreateFirewallDomainListResponse,
18
    CreateFirewallRuleGroupResponse,
19
    CreateFirewallRuleResponse,
20
    CreateResolverEndpointResponse,
21
    CreateResolverQueryLogConfigResponse,
22
    CreatorRequestId,
23
    DeleteFirewallDomainListResponse,
24
    DeleteFirewallRuleGroupResponse,
25
    DeleteFirewallRuleResponse,
26
    DeleteResolverQueryLogConfigResponse,
27
    DestinationArn,
28
    DisassociateFirewallRuleGroupResponse,
29
    DisassociateResolverQueryLogConfigResponse,
30
    DnsThreatProtection,
31
    Filters,
32
    FirewallConfig,
33
    FirewallDomainList,
34
    FirewallDomainListMetadata,
35
    FirewallDomainName,
36
    FirewallDomainRedirectionAction,
37
    FirewallDomains,
38
    FirewallDomainUpdateOperation,
39
    FirewallFailOpenStatus,
40
    FirewallRule,
41
    FirewallRuleGroup,
42
    FirewallRuleGroupAssociation,
43
    FirewallRuleGroupMetadata,
44
    GetFirewallConfigResponse,
45
    GetFirewallDomainListResponse,
46
    GetFirewallRuleGroupAssociationResponse,
47
    GetFirewallRuleGroupResponse,
48
    GetResolverQueryLogConfigAssociationResponse,
49
    GetResolverQueryLogConfigResponse,
50
    InvalidParameterException,
51
    InvalidRequestException,
52
    IpAddressesRequest,
53
    ListDomainMaxResults,
54
    ListFirewallConfigsMaxResult,
55
    ListFirewallConfigsResponse,
56
    ListFirewallDomainListsResponse,
57
    ListFirewallDomainsResponse,
58
    ListFirewallRuleGroupsResponse,
59
    ListFirewallRulesResponse,
60
    ListResolverQueryLogConfigAssociationsResponse,
61
    ListResolverQueryLogConfigsResponse,
62
    MaxResults,
63
    MutationProtectionStatus,
64
    Name,
65
    NextToken,
66
    OutpostArn,
67
    OutpostInstanceType,
68
    Priority,
69
    ProtocolList,
70
    Qtype,
71
    ResolverEndpointDirection,
72
    ResolverEndpointType,
73
    ResolverQueryLogConfig,
74
    ResolverQueryLogConfigAssociation,
75
    ResolverQueryLogConfigName,
76
    ResolverQueryLogConfigStatus,
77
    ResourceId,
78
    ResourceNotFoundException,
79
    RniEnhancedMetricsEnabled,
80
    Route53ResolverApi,
81
    SecurityGroupIds,
82
    SortByKey,
83
    SortOrder,
84
    TagList,
85
    TargetNameServerMetricsEnabled,
86
    UpdateFirewallConfigResponse,
87
    UpdateFirewallDomainsResponse,
88
    UpdateFirewallRuleGroupAssociationResponse,
89
    UpdateFirewallRuleResponse,
90
    ValidationException,
91
)
92
from localstack.services.ec2.models import get_ec2_backend
1✔
93
from localstack.services.moto import call_moto
1✔
94
from localstack.services.route53resolver.models import Route53ResolverStore, route53resolver_stores
1✔
95
from localstack.services.route53resolver.utils import (
1✔
96
    get_resolver_query_log_config_id,
97
    get_route53_resolver_firewall_domain_list_id,
98
    get_route53_resolver_firewall_rule_group_association_id,
99
    get_route53_resolver_firewall_rule_group_id,
100
    get_route53_resolver_query_log_config_association_id,
101
    validate_destination_arn,
102
    validate_mutation_protection,
103
    validate_priority,
104
)
105
from localstack.state import StateVisitor
1✔
106
from localstack.utils.aws import arns
1✔
107
from localstack.utils.aws.arns import extract_account_id_from_arn, extract_region_from_arn
1✔
108
from localstack.utils.collections import select_from_typed_dict
1✔
109
from localstack.utils.patch import patch
1✔
110

111

112
class Route53ResolverProvider(Route53ResolverApi):
1✔
113
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
114
        visitor.visit(route53resolver_backends)
×
UNCOV
115
        visitor.visit(route53resolver_stores)
×
116

117
    @staticmethod
1✔
118
    def get_store(account_id: str, region: str) -> Route53ResolverStore:
1✔
119
        return route53resolver_stores[account_id][region]
1✔
120

121
    def create_firewall_rule_group(
1✔
122
        self,
123
        context: RequestContext,
124
        creator_request_id: CreatorRequestId,
125
        name: Name,
126
        tags: TagList = None,
127
        **kwargs,
128
    ) -> CreateFirewallRuleGroupResponse:
129
        """Create a Firewall Rule Group."""
130
        store = self.get_store(context.account_id, context.region)
1✔
131
        firewall_rule_group_id = get_route53_resolver_firewall_rule_group_id()
1✔
132
        arn = arns.route53_resolver_firewall_rule_group_arn(
1✔
133
            firewall_rule_group_id, context.account_id, context.region
134
        )
135
        firewall_rule_group = FirewallRuleGroup(
1✔
136
            Id=firewall_rule_group_id,
137
            Arn=arn,
138
            Name=name,
139
            RuleCount=0,
140
            Status="COMPLETE",
141
            OwnerId=context.account_id,
142
            ShareStatus="NOT_SHARED",
143
            StatusMessage="Created Firewall Rule Group",
144
            CreatorRequestId=creator_request_id,
145
            CreationTime=datetime.now(UTC).isoformat(),
146
            ModificationTime=datetime.now(UTC).isoformat(),
147
        )
148
        store.firewall_rule_groups[firewall_rule_group_id] = firewall_rule_group
1✔
149
        store.firewall_rules[firewall_rule_group_id] = {}
1✔
150
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
1✔
151
            arn, tags or []
152
        )
153
        return CreateFirewallRuleGroupResponse(FirewallRuleGroup=firewall_rule_group)
1✔
154

155
    def delete_firewall_rule_group(
1✔
156
        self, context: RequestContext, firewall_rule_group_id: ResourceId, **kwargs
157
    ) -> DeleteFirewallRuleGroupResponse:
158
        """Delete a Firewall Rule Group."""
159
        store = self.get_store(context.account_id, context.region)
1✔
160
        firewall_rule_group: FirewallRuleGroup = store.delete_firewall_rule_group(
1✔
161
            firewall_rule_group_id
162
        )
163
        return DeleteFirewallRuleGroupResponse(FirewallRuleGroup=firewall_rule_group)
1✔
164

165
    def get_firewall_rule_group(
1✔
166
        self, context: RequestContext, firewall_rule_group_id: ResourceId, **kwargs
167
    ) -> GetFirewallRuleGroupResponse:
168
        """Get the details of a Firewall Rule Group."""
UNCOV
169
        store = self.get_store(context.account_id, context.region)
×
UNCOV
170
        firewall_rule_group: FirewallRuleGroup = store.get_firewall_rule_group(
×
171
            firewall_rule_group_id
172
        )
UNCOV
173
        return GetFirewallRuleGroupResponse(FirewallRuleGroup=firewall_rule_group)
×
174

175
    def list_firewall_rule_groups(
1✔
176
        self,
177
        context: RequestContext,
178
        max_results: MaxResults = None,
179
        next_token: NextToken = None,
180
        **kwargs,
181
    ) -> ListFirewallRuleGroupsResponse:
182
        """List Firewall Rule Groups."""
183
        store = self.get_store(context.account_id, context.region)
×
184
        firewall_rule_groups = []
×
UNCOV
185
        for firewall_rule_group in store.firewall_rule_groups.values():
×
UNCOV
186
            firewall_rule_groups.append(
×
187
                select_from_typed_dict(FirewallRuleGroupMetadata, firewall_rule_group)
188
            )
UNCOV
189
        return ListFirewallRuleGroupsResponse(FirewallRuleGroups=firewall_rule_groups)
×
190

191
    def create_firewall_domain_list(
1✔
192
        self,
193
        context: RequestContext,
194
        creator_request_id: CreatorRequestId,
195
        name: Name,
196
        tags: TagList = None,
197
        **kwargs,
198
    ) -> CreateFirewallDomainListResponse:
199
        """Create a Firewall Domain List."""
200
        store = self.get_store(context.account_id, context.region)
1✔
201
        id = get_route53_resolver_firewall_domain_list_id()
1✔
202
        arn = arns.route53_resolver_firewall_domain_list_arn(id, context.account_id, context.region)
1✔
203
        firewall_domain_list = FirewallDomainList(
1✔
204
            Id=id,
205
            Arn=arn,
206
            Name=name,
207
            DomainCount=0,
208
            Status="COMPLETE",
209
            StatusMessage="Created Firewall Domain List",
210
            ManagedOwnerName=context.account_id,
211
            CreatorRequestId=creator_request_id,
212
            CreationTime=datetime.now(UTC).isoformat(),
213
            ModificationTime=datetime.now(UTC).isoformat(),
214
        )
215
        store.firewall_domain_lists[id] = firewall_domain_list
1✔
216
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
1✔
217
            arn, tags or []
218
        )
219
        return CreateFirewallDomainListResponse(FirewallDomainList=firewall_domain_list)
1✔
220

221
    def delete_firewall_domain_list(
1✔
222
        self, context: RequestContext, firewall_domain_list_id: ResourceId, **kwargs
223
    ) -> DeleteFirewallDomainListResponse:
224
        """Delete a Firewall Domain List."""
225
        store = self.get_store(context.account_id, context.region)
1✔
226
        firewall_domain_list: FirewallDomainList = store.delete_firewall_domain_list(
1✔
227
            firewall_domain_list_id
228
        )
229
        return DeleteFirewallDomainListResponse(FirewallDomainList=firewall_domain_list)
1✔
230

231
    def get_firewall_domain_list(
1✔
232
        self, context: RequestContext, firewall_domain_list_id: ResourceId, **kwargs
233
    ) -> GetFirewallDomainListResponse:
234
        """Get the details of a Firewall Domain List."""
UNCOV
235
        store = self.get_store(context.account_id, context.region)
×
UNCOV
236
        firewall_domain_list: FirewallDomainList = store.get_firewall_domain_list(
×
237
            firewall_domain_list_id
238
        )
UNCOV
239
        return GetFirewallDomainListResponse(FirewallDomainList=firewall_domain_list)
×
240

241
    def list_firewall_domain_lists(
1✔
242
        self,
243
        context: RequestContext,
244
        max_results: MaxResults = None,
245
        next_token: NextToken = None,
246
        **kwargs,
247
    ) -> ListFirewallDomainListsResponse:
248
        """List all Firewall Domain Lists."""
249
        store = self.get_store(context.account_id, context.region)
1✔
250
        firewall_domain_lists = []
1✔
251
        for firewall_domain_list in store.firewall_domain_lists.values():
1✔
252
            firewall_domain_lists.append(
1✔
253
                select_from_typed_dict(FirewallDomainListMetadata, firewall_domain_list)
254
            )
255
        return ListFirewallDomainListsResponse(FirewallDomainLists=firewall_domain_lists)
1✔
256

257
    def update_firewall_domains(
1✔
258
        self,
259
        context: RequestContext,
260
        firewall_domain_list_id: ResourceId,
261
        operation: FirewallDomainUpdateOperation,
262
        domains: FirewallDomains,
263
        **kwargs,
264
    ) -> UpdateFirewallDomainsResponse:
265
        """Update the domains in a Firewall Domain List."""
266
        store = self.get_store(context.account_id, context.region)
×
267

UNCOV
268
        firewall_domain_list: FirewallDomainList = store.get_firewall_domain_list(
×
269
            firewall_domain_list_id
270
        )
271
        firewall_domains = store.get_firewall_domain(firewall_domain_list_id)
×
272

273
        if operation == FirewallDomainUpdateOperation.ADD:
×
UNCOV
274
            if not firewall_domains:
×
275
                store.firewall_domains[firewall_domain_list_id] = domains
×
276
            else:
277
                store.firewall_domains[firewall_domain_list_id].append(domains)
×
278

279
        if operation == FirewallDomainUpdateOperation.REMOVE:
×
280
            if firewall_domains:
×
281
                for domain in domains:
×
UNCOV
282
                    if domain in firewall_domains:
×
283
                        firewall_domains.remove(domain)
×
284
                    else:
UNCOV
285
                        raise ValidationException(
×
286
                            f"[RSLVR-02502] The following domains don't exist in the DNS Firewall domain list '{firewall_domain_list_id}'. You can't delete a domain that isn't in a domain list. Example unknown domain: '{domain}'. Trace Id: '{localstack.services.route53resolver.utils.get_trace_id()}'"
287
                        )
288

UNCOV
289
        if operation == FirewallDomainUpdateOperation.REPLACE:
×
290
            store.firewall_domains[firewall_domain_list_id] = domains
×
291

292
        firewall_domain_list["StatusMessage"] = "Finished domain list update"
×
UNCOV
293
        firewall_domain_list["ModificationTime"] = datetime.now(UTC).isoformat()
×
UNCOV
294
        return UpdateFirewallDomainsResponse(
×
295
            Id=firewall_domain_list.get("Id"),
296
            Name=firewall_domain_list.get("Name"),
297
            Status=firewall_domain_list.get("Status"),
298
            StatusMessage=firewall_domain_list.get("StatusMessage"),
299
        )
300

301
    def list_firewall_domains(
1✔
302
        self,
303
        context: RequestContext,
304
        firewall_domain_list_id: ResourceId,
305
        max_results: ListDomainMaxResults = None,
306
        next_token: NextToken = None,
307
        **kwargs,
308
    ) -> ListFirewallDomainsResponse:
309
        """List the domains in a DNS Firewall domain list."""
310
        store = self.get_store(context.account_id, context.region)
×
311
        firewall_domains: FirewallDomains[FirewallDomainName] = []
×
312
        if store.firewall_domains.get(firewall_domain_list_id):
×
313
            for firewall_domain in store.firewall_domains.get(firewall_domain_list_id):
×
UNCOV
314
                firewall_domains.append(FirewallDomainName(firewall_domain))
×
UNCOV
315
        return ListFirewallDomainsResponse(Domains=firewall_domains)
×
316

317
    def create_firewall_rule(
1✔
318
        self,
319
        context: RequestContext,
320
        creator_request_id: CreatorRequestId,
321
        firewall_rule_group_id: ResourceId,
322
        priority: Priority,
323
        action: Action,
324
        name: Name,
325
        firewall_domain_list_id: ResourceId = None,
326
        block_response: BlockResponse = None,
327
        block_override_domain: BlockOverrideDomain = None,
328
        block_override_dns_type: BlockOverrideDnsType = None,
329
        block_override_ttl: BlockOverrideTtl = None,
330
        firewall_domain_redirection_action: FirewallDomainRedirectionAction = None,
331
        qtype: Qtype = None,
332
        dns_threat_protection: DnsThreatProtection = None,
333
        confidence_threshold: ConfidenceThreshold = None,
334
        **kwargs,
335
    ) -> CreateFirewallRuleResponse:
336
        """Create a new firewall rule"""
337
        # TODO add support for firewall_domain_list_id, dns_threat_protection, and confidence_threshold
338
        store = self.get_store(context.account_id, context.region)
1✔
339
        firewall_rule = FirewallRule(
1✔
340
            FirewallRuleGroupId=firewall_rule_group_id,
341
            FirewallDomainListId=firewall_domain_list_id,
342
            Name=name,
343
            Priority=priority,
344
            Action=action,
345
            BlockResponse=block_response,
346
            BlockOverrideDomain=block_override_domain,
347
            BlockOverrideDnsType=block_override_dns_type,
348
            BlockOverrideTtl=block_override_ttl,
349
            CreatorRequestId=creator_request_id,
350
            CreationTime=datetime.now(UTC).isoformat(),
351
            ModificationTime=datetime.now(UTC).isoformat(),
352
            FirewallDomainRedirectionAction=firewall_domain_redirection_action,
353
            Qtype=qtype,
354
        )
355
        if firewall_rule_group_id in store.firewall_rules:
1✔
356
            store.firewall_rules[firewall_rule_group_id][firewall_domain_list_id] = firewall_rule
1✔
357
        # TODO: handle missing firewall-rule-group-id
358
        return CreateFirewallRuleResponse(FirewallRule=firewall_rule)
1✔
359

360
    def delete_firewall_rule(
1✔
361
        self,
362
        context: RequestContext,
363
        firewall_rule_group_id: ResourceId,
364
        firewall_domain_list_id: ResourceId = None,
365
        firewall_threat_protection_id: ResourceId = None,
366
        qtype: Qtype = None,
367
        **kwargs,
368
    ) -> DeleteFirewallRuleResponse:
369
        """Delete a firewall rule"""
370
        store = self.get_store(context.account_id, context.region)
1✔
371
        firewall_rule: FirewallRule = store.delete_firewall_rule(
1✔
372
            firewall_rule_group_id, firewall_domain_list_id
373
        )
374
        return DeleteFirewallRuleResponse(
1✔
375
            FirewallRule=firewall_rule,
376
        )
377

378
    def list_firewall_rules(
1✔
379
        self,
380
        context: RequestContext,
381
        firewall_rule_group_id: ResourceId,
382
        priority: Priority = None,
383
        action: Action = None,
384
        max_results: MaxResults = None,
385
        next_token: NextToken = None,
386
        **kwargs,
387
    ) -> ListFirewallRulesResponse:
388
        """List firewall rules in a firewall rule group.
389

390
        Rules will be filtered by priority and action if values for these params are provided.
391

392
        Raises:
393
            ResourceNotFound: If a firewall group by the provided id does not exist.
394
        """
395
        store = self.get_store(context.account_id, context.region)
1✔
396
        firewall_rule_group = store.firewall_rules.get(firewall_rule_group_id)
1✔
397
        if firewall_rule_group is None:
1✔
398
            raise ResourceNotFoundException(
1✔
399
                f"Can't find the resource with ID '{firewall_rule_group_id}'. Trace Id: '{localstack.services.route53resolver.utils.get_trace_id()}'"
400
            )
401

402
        firewall_rules = [
1✔
403
            FirewallRule(rule)
404
            for rule in firewall_rule_group.values()
405
            if (action is None or action == rule["Action"])
406
            and (priority is None or priority == rule["Priority"])
407
        ]
408

409
        # TODO: implement max_results filtering and next_token handling
410
        return ListFirewallRulesResponse(FirewallRules=firewall_rules)
1✔
411

412
    def update_firewall_rule(
1✔
413
        self,
414
        context: RequestContext,
415
        firewall_rule_group_id: ResourceId,
416
        firewall_domain_list_id: ResourceId = None,
417
        firewall_threat_protection_id: ResourceId = None,
418
        priority: Priority = None,
419
        action: Action = None,
420
        block_response: BlockResponse = None,
421
        block_override_domain: BlockOverrideDomain = None,
422
        block_override_dns_type: BlockOverrideDnsType = None,
423
        block_override_ttl: BlockOverrideTtl = None,
424
        name: Name = None,
425
        firewall_domain_redirection_action: FirewallDomainRedirectionAction = None,
426
        qtype: Qtype = None,
427
        dns_threat_protection: DnsThreatProtection = None,
428
        confidence_threshold: ConfidenceThreshold = None,
429
        **kwargs,
430
    ) -> UpdateFirewallRuleResponse:
431
        """Updates a firewall rule"""
UNCOV
432
        store = self.get_store(context.account_id, context.region)
×
UNCOV
433
        firewall_rule: FirewallRule = store.get_firewall_rule(
×
434
            firewall_rule_group_id, firewall_domain_list_id
435
        )
436

437
        if priority:
×
438
            firewall_rule["Priority"] = priority
×
439
        if action:
×
440
            firewall_rule["Action"] = action
×
441
        if block_response:
×
442
            firewall_rule["BlockResponse"] = block_response
×
443
        if block_override_domain:
×
444
            firewall_rule["BlockOverrideDomain"] = block_override_domain
×
445
        if block_override_dns_type:
×
446
            firewall_rule["BlockOverrideDnsType"] = block_override_dns_type
×
447
        if block_override_ttl:
×
448
            firewall_rule["BlockOverrideTtl"] = block_override_ttl
×
449
        if name:
×
450
            firewall_rule["Name"] = name
×
451
        if firewall_domain_redirection_action:
×
452
            firewall_rule["FirewallDomainRedirectionAction"] = firewall_domain_redirection_action
×
453
        if qtype:
×
UNCOV
454
            firewall_rule["Qtype"] = qtype
×
UNCOV
455
        return UpdateFirewallRuleResponse(
×
456
            FirewallRule=firewall_rule,
457
        )
458

459
    def associate_firewall_rule_group(
1✔
460
        self,
461
        context: RequestContext,
462
        creator_request_id: CreatorRequestId,
463
        firewall_rule_group_id: ResourceId,
464
        vpc_id: ResourceId,
465
        priority: Priority,
466
        name: Name,
467
        mutation_protection: MutationProtectionStatus = None,
468
        tags: TagList = None,
469
        **kwargs,
470
    ) -> AssociateFirewallRuleGroupResponse:
471
        """Associate a firewall rule group with a VPC."""
472
        store = self.get_store(context.account_id, context.region)
×
UNCOV
473
        validate_priority(priority=priority)
×
474
        validate_mutation_protection(mutation_protection=mutation_protection)
×
475

UNCOV
476
        for firewall_rule_group_association in store.firewall_rule_group_associations.values():
×
UNCOV
477
            if (
×
478
                firewall_rule_group_association.get("VpcId") == vpc_id
479
                and firewall_rule_group_association.get("FirewallRuleGroupId")
480
                == firewall_rule_group_id
481
            ):
UNCOV
482
                raise ValidationException(
×
483
                    f"[RSLVR-02302] This DNS Firewall rule group can't be associated to a VPC: '{vpc_id}'. It is already associated to VPC '{firewall_rule_group_id}'. Try again with another VPC or DNS Firewall rule group. Trace Id: '{localstack.services.route53resolver.utils.get_trace_id()}'"
484
                )
485

UNCOV
486
        id = get_route53_resolver_firewall_rule_group_association_id()
×
UNCOV
487
        arn = arns.route53_resolver_firewall_rule_group_associations_arn(
×
488
            id, context.account_id, context.region
489
        )
490

UNCOV
491
        firewall_rule_group_association = FirewallRuleGroupAssociation(
×
492
            Id=id,
493
            Arn=arn,
494
            FirewallRuleGroupId=firewall_rule_group_id,
495
            VpcId=vpc_id,
496
            Name=name,
497
            Priority=priority,
498
            MutationProtection=mutation_protection or "DISABLED",
499
            Status="COMPLETE",
500
            StatusMessage="Creating Firewall Rule Group Association",
501
            CreatorRequestId=creator_request_id,
502
            CreationTime=datetime.now(UTC).isoformat(),
503
            ModificationTime=datetime.now(UTC).isoformat(),
504
        )
UNCOV
505
        store.firewall_rule_group_associations[id] = firewall_rule_group_association
×
UNCOV
506
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
×
507
            arn, tags or []
508
        )
UNCOV
509
        return AssociateFirewallRuleGroupResponse(
×
510
            FirewallRuleGroupAssociation=firewall_rule_group_association
511
        )
512

513
    def disassociate_firewall_rule_group(
1✔
514
        self, context: RequestContext, firewall_rule_group_association_id: ResourceId, **kwargs
515
    ) -> DisassociateFirewallRuleGroupResponse:
516
        """Disassociate a DNS Firewall rule group from a VPC."""
UNCOV
517
        store = self.get_store(context.account_id, context.region)
×
UNCOV
518
        firewall_rule_group_association: FirewallRuleGroupAssociation = (
×
519
            store.delete_firewall_rule_group_association(firewall_rule_group_association_id)
520
        )
UNCOV
521
        return DisassociateFirewallRuleGroupResponse(
×
522
            FirewallRuleGroupAssociation=firewall_rule_group_association
523
        )
524

525
    def get_firewall_rule_group_association(
1✔
526
        self, context: RequestContext, firewall_rule_group_association_id: ResourceId, **kwargs
527
    ) -> GetFirewallRuleGroupAssociationResponse:
528
        """Returns the Firewall Rule Group Association that you specified."""
UNCOV
529
        store = self.get_store(context.account_id, context.region)
×
UNCOV
530
        firewall_rule_group_association: FirewallRuleGroupAssociation = (
×
531
            store.get_firewall_rule_group_association(firewall_rule_group_association_id)
532
        )
UNCOV
533
        return GetFirewallRuleGroupAssociationResponse(
×
534
            FirewallRuleGroupAssociation=firewall_rule_group_association
535
        )
536

537
    def update_firewall_rule_group_association(
1✔
538
        self,
539
        context: RequestContext,
540
        firewall_rule_group_association_id: ResourceId,
541
        priority: Priority = None,
542
        mutation_protection: MutationProtectionStatus = None,
543
        name: Name = None,
544
        **kwargs,
545
    ) -> UpdateFirewallRuleGroupAssociationResponse:
546
        """Updates the specified Firewall Rule Group Association."""
547
        store = self.get_store(context.account_id, context.region)
×
UNCOV
548
        validate_priority(priority=priority)
×
549
        validate_mutation_protection(mutation_protection=mutation_protection)
×
550

UNCOV
551
        firewall_rule_group_association: FirewallRuleGroupAssociation = (
×
552
            store.get_firewall_rule_group_association(firewall_rule_group_association_id)
553
        )
554

555
        if priority:
×
556
            firewall_rule_group_association["Priority"] = priority
×
557
        if mutation_protection:
×
558
            firewall_rule_group_association["MutationProtection"] = mutation_protection
×
UNCOV
559
        if name:
×
560
            firewall_rule_group_association["Name"] = name
×
561

UNCOV
562
        return UpdateFirewallRuleGroupAssociationResponse(
×
563
            FirewallRuleGroupAssociation=firewall_rule_group_association
564
        )
565

566
    def create_resolver_query_log_config(
1✔
567
        self,
568
        context: RequestContext,
569
        name: ResolverQueryLogConfigName,
570
        destination_arn: DestinationArn,
571
        creator_request_id: CreatorRequestId,
572
        tags: TagList = None,
573
        **kwargs,
574
    ) -> CreateResolverQueryLogConfigResponse:
575
        store = self.get_store(context.account_id, context.region)
1✔
576
        validate_destination_arn(destination_arn)
1✔
577
        id = get_resolver_query_log_config_id()
1✔
578
        arn = arns.route53_resolver_query_log_config_arn(id, context.account_id, context.region)
1✔
579
        resolver_query_log_config: ResolverQueryLogConfig = ResolverQueryLogConfig(
1✔
580
            Id=id,
581
            Arn=arn,
582
            Name=name,
583
            AssociationCount=0,
584
            Status="CREATED",
585
            OwnerId=context.account_id,
586
            ShareStatus="NOT_SHARED",
587
            DestinationArn=destination_arn,
588
            CreatorRequestId=creator_request_id,
589
            CreationTime=datetime.now(UTC).isoformat(),
590
        )
591
        store.resolver_query_log_configs[id] = resolver_query_log_config
1✔
592
        route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
1✔
593
            arn, tags or []
594
        )
595
        return CreateResolverQueryLogConfigResponse(
1✔
596
            ResolverQueryLogConfig=resolver_query_log_config
597
        )
598

599
    def create_resolver_endpoint(
1✔
600
        self,
601
        context: RequestContext,
602
        creator_request_id: CreatorRequestId,
603
        security_group_ids: SecurityGroupIds,
604
        direction: ResolverEndpointDirection,
605
        ip_addresses: IpAddressesRequest,
606
        name: Name | None = None,
607
        outpost_arn: OutpostArn | None = None,
608
        preferred_instance_type: OutpostInstanceType | None = None,
609
        tags: TagList | None = None,
610
        resolver_endpoint_type: ResolverEndpointType | None = None,
611
        protocols: ProtocolList | None = None,
612
        rni_enhanced_metrics_enabled: RniEnhancedMetricsEnabled | None = None,
613
        target_name_server_metrics_enabled: TargetNameServerMetricsEnabled | None = None,
614
        **kwargs,
615
    ) -> CreateResolverEndpointResponse:
616
        create_resolver_endpoint_resp = call_moto(context)
1✔
617
        create_resolver_endpoint_resp["ResolverEndpoint"]["Status"] = (
1✔
618
            ResolverQueryLogConfigStatus.CREATING
619
        )
620
        return create_resolver_endpoint_resp
1✔
621

622
    def get_resolver_query_log_config(
1✔
623
        self, context: RequestContext, resolver_query_log_config_id: ResourceId, **kwargs
624
    ) -> GetResolverQueryLogConfigResponse:
625
        store = self.get_store(context.account_id, context.region)
×
UNCOV
626
        resolver_query_log_config: ResolverQueryLogConfig = store.get_resolver_query_log_config(
×
627
            resolver_query_log_config_id
628
        )
UNCOV
629
        return GetResolverQueryLogConfigResponse(ResolverQueryLogConfig=resolver_query_log_config)
×
630

631
    def delete_resolver_query_log_config(
1✔
632
        self, context: RequestContext, resolver_query_log_config_id: ResourceId, **kwargs
633
    ) -> DeleteResolverQueryLogConfigResponse:
634
        store = self.get_store(context.account_id, context.region)
1✔
635
        resolver_query_log_config: ResolverQueryLogConfig = store.delete_resolver_query_log_config(
1✔
636
            resolver_query_log_config_id
637
        )
638
        return DeleteResolverQueryLogConfigResponse(
1✔
639
            ResolverQueryLogConfig=resolver_query_log_config
640
        )
641

642
    def list_resolver_query_log_configs(
1✔
643
        self,
644
        context: RequestContext,
645
        max_results: MaxResults = None,
646
        next_token: NextToken = None,
647
        filters: Filters = None,
648
        sort_by: SortByKey = None,
649
        sort_order: SortOrder = None,
650
        **kwargs,
651
    ) -> ListResolverQueryLogConfigsResponse:
652
        store = self.get_store(context.account_id, context.region)
1✔
653
        resolver_query_log_configs = []
1✔
654
        for resolver_query_log_config in store.resolver_query_log_configs.values():
1✔
655
            resolver_query_log_configs.append(ResolverQueryLogConfig(resolver_query_log_config))
1✔
656
        return ListResolverQueryLogConfigsResponse(
1✔
657
            ResolverQueryLogConfigs=resolver_query_log_configs,
658
            TotalCount=len(resolver_query_log_configs),
659
        )
660

661
    def associate_resolver_query_log_config(
1✔
662
        self,
663
        context: RequestContext,
664
        resolver_query_log_config_id: ResourceId,
665
        resource_id: ResourceId,
666
        **kwargs,
667
    ) -> AssociateResolverQueryLogConfigResponse:
UNCOV
668
        store = self.get_store(context.account_id, context.region)
×
UNCOV
669
        id = get_route53_resolver_query_log_config_association_id()
×
670

UNCOV
671
        resolver_query_log_config_association: ResolverQueryLogConfigAssociation = (
×
672
            ResolverQueryLogConfigAssociation(
673
                Id=id,
674
                ResolverQueryLogConfigId=resolver_query_log_config_id,
675
                ResourceId=resource_id,
676
                Status="ACTIVE",
677
                Error="NONE",
678
                ErrorMessage="",
679
                CreationTime=datetime.now(UTC).isoformat(),
680
            )
681
        )
682

UNCOV
683
        store.resolver_query_log_config_associations[id] = resolver_query_log_config_association
×
684

UNCOV
685
        return AssociateResolverQueryLogConfigResponse(
×
686
            ResolverQueryLogConfigAssociation=resolver_query_log_config_association
687
        )
688

689
    def disassociate_resolver_query_log_config(
1✔
690
        self,
691
        context: RequestContext,
692
        resolver_query_log_config_id: ResourceId,
693
        resource_id: ResourceId,
694
        **kwargs,
695
    ) -> DisassociateResolverQueryLogConfigResponse:
UNCOV
696
        store = self.get_store(context.account_id, context.region)
×
697
        resolver_query_log_config_association = store.delete_resolver_query_log_config_associations(
×
698
            resolver_query_log_config_id, resource_id
699
        )
700

UNCOV
701
        return DisassociateResolverQueryLogConfigResponse(
×
702
            ResolverQueryLogConfigAssociation=resolver_query_log_config_association
703
        )
704

705
    def get_resolver_query_log_config_association(
1✔
706
        self,
707
        context: RequestContext,
708
        resolver_query_log_config_association_id: ResourceId,
709
        **kwargs,
710
    ) -> GetResolverQueryLogConfigAssociationResponse:
UNCOV
711
        store = self.get_store(context.account_id, context.region)
×
UNCOV
712
        resolver_query_log_config_association: ResolverQueryLogConfigAssociation = (
×
713
            store.get_resolver_query_log_config_associations(
714
                resolver_query_log_config_association_id
715
            )
716
        )
UNCOV
717
        return GetResolverQueryLogConfigAssociationResponse(
×
718
            ResolverQueryLogConfigAssociation=resolver_query_log_config_association
719
        )
720

721
    def list_resolver_query_log_config_associations(
1✔
722
        self,
723
        context: RequestContext,
724
        max_results: MaxResults = None,
725
        next_token: NextToken = None,
726
        filters: Filters = None,
727
        sort_by: SortByKey = None,
728
        sort_order: SortOrder = None,
729
        **kwargs,
730
    ) -> ListResolverQueryLogConfigAssociationsResponse:
UNCOV
731
        store = self.get_store(context.account_id, context.region)
×
732
        resolver_query_log_config_associations = []
×
UNCOV
733
        for (
×
734
            resolver_query_log_config_association
735
        ) in store.resolver_query_log_config_associations.values():
UNCOV
736
            resolver_query_log_config_associations.append(
×
737
                ResolverQueryLogConfigAssociation(resolver_query_log_config_association)
738
            )
UNCOV
739
        return ListResolverQueryLogConfigAssociationsResponse(
×
740
            TotalCount=len(resolver_query_log_config_associations),
741
            ResolverQueryLogConfigAssociations=resolver_query_log_config_associations,
742
        )
743

744
    def get_firewall_config(
1✔
745
        self, context: RequestContext, resource_id: ResourceId, **kwargs
746
    ) -> GetFirewallConfigResponse:
747
        store = self.get_store(context.account_id, context.region)
×
UNCOV
748
        firewall_config = store.get_or_create_firewall_config(
×
749
            resource_id, context.region, context.account_id
750
        )
UNCOV
751
        return GetFirewallConfigResponse(FirewallConfig=firewall_config)
×
752

753
    def list_firewall_configs(
1✔
754
        self,
755
        context: RequestContext,
756
        max_results: ListFirewallConfigsMaxResult = None,
757
        next_token: NextToken = None,
758
        **kwargs,
759
    ) -> ListFirewallConfigsResponse:
760
        store = self.get_store(context.account_id, context.region)
×
761
        firewall_configs = []
×
762
        backend = get_ec2_backend(context.account_id, context.region)
×
763
        for vpc in backend.vpcs:
×
764
            if vpc not in store.firewall_configs:
×
UNCOV
765
                store.get_or_create_firewall_config(vpc, context.region, context.account_id)
×
UNCOV
766
        for firewall_config in store.firewall_configs.values():
×
UNCOV
767
            firewall_configs.append(select_from_typed_dict(FirewallConfig, firewall_config))
×
UNCOV
768
        return ListFirewallConfigsResponse(FirewallConfigs=firewall_configs)
×
769

770
    def update_firewall_config(
1✔
771
        self,
772
        context: RequestContext,
773
        resource_id: ResourceId,
774
        firewall_fail_open: FirewallFailOpenStatus,
775
        **kwargs,
776
    ) -> UpdateFirewallConfigResponse:
777
        store = self.get_store(context.account_id, context.region)
×
UNCOV
778
        backend = get_ec2_backend(context.account_id, context.region)
×
UNCOV
779
        for resource_id in backend.vpcs:
×
780
            if resource_id not in store.firewall_configs:
×
UNCOV
781
                firewall_config = store.get_or_create_firewall_config(
×
782
                    resource_id, context.region, context.account_id
783
                )
784
                firewall_config["FirewallFailOpen"] = firewall_fail_open
×
785
            else:
UNCOV
786
                firewall_config = store.firewall_configs[resource_id]
×
UNCOV
787
                firewall_config["FirewallFailOpen"] = firewall_fail_open
×
UNCOV
788
        return UpdateFirewallConfigResponse(FirewallConfig=firewall_config)
×
789

790

791
@patch(MotoRoute53ResolverBackend._matched_arn)
1✔
792
def Route53ResolverBackend_matched_arn(fn, self, resource_arn):
1✔
793
    """Given ARN, raise exception if there is no corresponding resource."""
794
    account_id = extract_account_id_from_arn(resource_arn)
1✔
795
    region_name = extract_region_from_arn(resource_arn)
1✔
796
    store = Route53ResolverProvider.get_store(account_id, region_name)
1✔
797

798
    for firewall_rule_group in store.firewall_rule_groups.values():
1✔
UNCOV
799
        if firewall_rule_group.get("Arn") == resource_arn:
×
800
            return
×
801
    for firewall_domain_list in store.firewall_domain_lists.values():
1✔
802
        if firewall_domain_list.get("Arn") == resource_arn:
1✔
803
            return
1✔
804
    for firewall_rule_group_association in store.firewall_rule_group_associations.values():
×
805
        if firewall_rule_group_association.get("Arn") == resource_arn:
×
806
            return
×
UNCOV
807
    for resolver_query_log_config in store.resolver_query_log_configs.values():
×
UNCOV
808
        if resolver_query_log_config.get("Arn") == resource_arn:
×
UNCOV
809
            return
×
UNCOV
810
    fn(self, resource_arn)
×
811

812

813
@patch(MotoRoute53ResolverBackend.disassociate_resolver_rule)
1✔
814
def moto_disassociate_resolver_rule(fn, self, resolver_rule_id, vpc_id):
1✔
815
    if resolver_rule_id not in self.resolver_rules:
1✔
816
        raise ResourceNotFoundException(
1✔
817
            f'[RSLVR-00703] Resolver rule with ID "{resolver_rule_id}" does not exist.'
818
        )
819
    return fn(self, resolver_rule_id, vpc_id)
1✔
820

821

822
@patch(MotoRoute53ResolverBackend.create_resolver_endpoint)
1✔
823
def moto_create_resolver_endpoint(fn, self, *args, **kwargs):
1✔
824
    for group_id in kwargs.get("security_group_ids"):
1✔
825
        if not group_id.startswith("sg-"):
1✔
826
            raise InvalidParameterException(
1✔
827
                f'[RSLVR-00408] Malformed security group ID: "Invalid id: "{group_id}" '
828
                f'(expecting "sg-...")".'
829
            )
830
    return fn(self, *args, **kwargs)
1✔
831

832

833
@patch(MotoRoute53ResolverBackend.delete_resolver_rule)
1✔
834
def moto_delete_resolver_endpoint(fn, self, resolver_rule_id):
1✔
835
    if resolver_rule_id not in self.resolver_rules:
1✔
836
        raise ResourceNotFoundException(
1✔
837
            f'[RSLVR-00703] Resolver rule with ID "{resolver_rule_id}" does not exist.'
838
        )
839
    return fn(self, resolver_rule_id)
1✔
840

841

842
@patch(MotoRoute53ResolverBackend.create_resolver_rule)
1✔
843
def moto_create_resolver_rule(fn, self, *args, **kwargs):
1✔
844
    direction = [
1✔
845
        x.direction
846
        for x in self.resolver_endpoints.values()
847
        if x.id == kwargs.get("resolver_endpoint_id")
848
    ]
849
    if direction and direction[0] == ResolverEndpointDirection.INBOUND:
1✔
850
        raise InvalidRequestException(
1✔
851
            "[RSLVR-00700] Resolver rules can only be associated to OUTBOUND resolver endpoints."
852
        )
853
    return fn(self, *args, **kwargs)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc