• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19844934392

01 Dec 2025 07:55PM UTC coverage: 86.945% (+0.1%) from 86.821%
19844934392

push

github

web-flow
Update ASF APIs, provider signatures, disable lambda patches (#13444)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>
Co-authored-by: Silvio Vasiljevic <silvio.vasiljevic@gmail.com>

69707 of 80174 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.42
/localstack-core/localstack/services/ec2/provider.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
from abc import ABC
1✔
6
from datetime import UTC, datetime
1✔
7

8
from botocore.parsers import ResponseParserError
1✔
9
from moto.core.utils import camelcase_to_underscores, underscores_to_camelcase
1✔
10
from moto.ec2.exceptions import InvalidVpcEndPointIdError
1✔
11
from moto.ec2.models import (
1✔
12
    EC2Backend,
13
    FlowLogsBackend,
14
    SubnetBackend,
15
    TransitGatewayAttachmentBackend,
16
    VPCBackend,
17
    ec2_backends,
18
)
19
from moto.ec2.models.launch_templates import LaunchTemplate as MotoLaunchTemplate
1✔
20
from moto.ec2.models.subnets import Subnet
1✔
21

22
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
23
from localstack.aws.api.ec2 import (
1✔
24
    AvailabilityZone,
25
    Boolean,
26
    CreateFlowLogsRequest,
27
    CreateFlowLogsResult,
28
    CreateLaunchTemplateRequest,
29
    CreateLaunchTemplateResult,
30
    CreateSubnetRequest,
31
    CreateSubnetResult,
32
    CreateTransitGatewayRequest,
33
    CreateTransitGatewayResult,
34
    CurrencyCodeValues,
35
    DescribeAvailabilityZonesRequest,
36
    DescribeAvailabilityZonesResult,
37
    DescribeReservedInstancesOfferingsRequest,
38
    DescribeReservedInstancesOfferingsResult,
39
    DescribeReservedInstancesRequest,
40
    DescribeReservedInstancesResult,
41
    DescribeSubnetsRequest,
42
    DescribeSubnetsResult,
43
    DescribeTransitGatewaysRequest,
44
    DescribeTransitGatewaysResult,
45
    DescribeVpcEndpointServicesRequest,
46
    DescribeVpcEndpointServicesResult,
47
    DescribeVpcEndpointsRequest,
48
    DescribeVpcEndpointsResult,
49
    DnsOptions,
50
    DnsOptionsSpecification,
51
    DnsRecordIpType,
52
    Ec2Api,
53
    GetSecurityGroupsForVpcRequest,
54
    GetSecurityGroupsForVpcResult,
55
    InstanceType,
56
    IpAddressType,
57
    LaunchTemplate,
58
    ModifyLaunchTemplateRequest,
59
    ModifyLaunchTemplateResult,
60
    ModifySubnetAttributeRequest,
61
    ModifyVpcEndpointResult,
62
    OfferingClassType,
63
    OfferingTypeValues,
64
    PricingDetail,
65
    PurchaseReservedInstancesOfferingRequest,
66
    PurchaseReservedInstancesOfferingResult,
67
    RecurringCharge,
68
    RecurringChargeFrequency,
69
    ReservedInstances,
70
    ReservedInstancesOffering,
71
    ReservedInstanceState,
72
    RevokeSecurityGroupEgressRequest,
73
    RevokeSecurityGroupEgressResult,
74
    RIProductDescription,
75
    SecurityGroupForVpc,
76
    String,
77
    SubnetConfigurationsList,
78
    Tenancy,
79
    UnsuccessfulItem,
80
    UnsuccessfulItemError,
81
    VpcEndpointId,
82
    VpcEndpointRouteTableIdList,
83
    VpcEndpointSecurityGroupIdList,
84
    VpcEndpointSubnetIdList,
85
    scope,
86
)
87
from localstack.aws.connect import connect_to
1✔
88
from localstack.services.ec2.exceptions import (
1✔
89
    InvalidLaunchTemplateIdError,
90
    InvalidLaunchTemplateNameError,
91
    MissingParameterError,
92
)
93
from localstack.services.ec2.models import get_ec2_backend
1✔
94
from localstack.services.ec2.patches import apply_patches
1✔
95
from localstack.services.moto import call_moto, call_moto_with_request
1✔
96
from localstack.services.plugins import ServiceLifecycleHook
1✔
97
from localstack.state import StateVisitor
1✔
98
from localstack.utils.patch import patch
1✔
99
from localstack.utils.strings import first_char_to_upper, long_uid, short_uid
1✔
100

101
LOG = logging.getLogger(__name__)
1✔
102

103
# additional subnet attributes not yet supported upstream
104
ADDITIONAL_SUBNET_ATTRS = ("private_dns_name_options_on_launch", "enable_dns64")
1✔
105

106

107
class Ec2Provider(Ec2Api, ABC, ServiceLifecycleHook):
1✔
108
    def on_after_init(self):
1✔
109
        apply_patches()
1✔
110

111
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
112
        from moto.ec2.models import ec2_backends
×
113

114
        visitor.visit(ec2_backends)
×
115

116
    @handler("DescribeAvailabilityZones", expand=False)
1✔
117
    def describe_availability_zones(
1✔
118
        self,
119
        context: RequestContext,
120
        describe_availability_zones_request: DescribeAvailabilityZonesRequest,
121
    ) -> DescribeAvailabilityZonesResult:
122
        backend = get_ec2_backend(context.account_id, context.region)
1✔
123
        zone_names = describe_availability_zones_request.get("ZoneNames")
1✔
124
        zone_ids = describe_availability_zones_request.get("ZoneIds")
1✔
125
        if zone_names or zone_ids:
1✔
126
            filtered_zones = backend.describe_availability_zones(
1✔
127
                zone_names=zone_names, zone_ids=zone_ids
128
            )
129
            availability_zones = [
1✔
130
                AvailabilityZone(
131
                    State="available",
132
                    Messages=[],
133
                    RegionName=zone.region_name,
134
                    ZoneName=zone.name,
135
                    ZoneId=zone.zone_id,
136
                    ZoneType=zone.zone_type,
137
                )
138
                for zone in filtered_zones
139
            ]
140
            return DescribeAvailabilityZonesResult(AvailabilityZones=availability_zones)
1✔
141
        return call_moto(context)
1✔
142

143
    @handler("DescribeReservedInstancesOfferings", expand=False)
1✔
144
    def describe_reserved_instances_offerings(
1✔
145
        self,
146
        context: RequestContext,
147
        describe_reserved_instances_offerings_request: DescribeReservedInstancesOfferingsRequest,
148
    ) -> DescribeReservedInstancesOfferingsResult:
149
        return DescribeReservedInstancesOfferingsResult(
1✔
150
            ReservedInstancesOfferings=[
151
                ReservedInstancesOffering(
152
                    AvailabilityZone="eu-central-1a",
153
                    Duration=2628000,
154
                    FixedPrice=0.0,
155
                    InstanceType=InstanceType.t2_small,
156
                    ProductDescription=RIProductDescription.Linux_UNIX,
157
                    ReservedInstancesOfferingId=long_uid(),
158
                    UsagePrice=0.0,
159
                    CurrencyCode=CurrencyCodeValues.USD,
160
                    InstanceTenancy=Tenancy.default,
161
                    Marketplace=True,
162
                    PricingDetails=[PricingDetail(Price=0.0, Count=3)],
163
                    RecurringCharges=[
164
                        RecurringCharge(Amount=0.25, Frequency=RecurringChargeFrequency.Hourly)
165
                    ],
166
                    Scope=scope.Availability_Zone,
167
                )
168
            ]
169
        )
170

171
    @handler("DescribeReservedInstances", expand=False)
1✔
172
    def describe_reserved_instances(
1✔
173
        self,
174
        context: RequestContext,
175
        describe_reserved_instances_request: DescribeReservedInstancesRequest,
176
    ) -> DescribeReservedInstancesResult:
177
        return DescribeReservedInstancesResult(
1✔
178
            ReservedInstances=[
179
                ReservedInstances(
180
                    AvailabilityZone="eu-central-1a",
181
                    Duration=2628000,
182
                    End=datetime(2016, 6, 30, tzinfo=UTC),
183
                    FixedPrice=0.0,
184
                    InstanceCount=2,
185
                    InstanceType=InstanceType.t2_small,
186
                    ProductDescription=RIProductDescription.Linux_UNIX,
187
                    ReservedInstancesId=long_uid(),
188
                    Start=datetime(2016, 1, 1, tzinfo=UTC),
189
                    State=ReservedInstanceState.active,
190
                    UsagePrice=0.05,
191
                    CurrencyCode=CurrencyCodeValues.USD,
192
                    InstanceTenancy=Tenancy.default,
193
                    OfferingClass=OfferingClassType.standard,
194
                    OfferingType=OfferingTypeValues.Partial_Upfront,
195
                    RecurringCharges=[
196
                        RecurringCharge(Amount=0.05, Frequency=RecurringChargeFrequency.Hourly)
197
                    ],
198
                    Scope=scope.Availability_Zone,
199
                )
200
            ]
201
        )
202

203
    @handler("PurchaseReservedInstancesOffering", expand=False)
1✔
204
    def purchase_reserved_instances_offering(
1✔
205
        self,
206
        context: RequestContext,
207
        purchase_reserved_instances_offerings_request: PurchaseReservedInstancesOfferingRequest,
208
    ) -> PurchaseReservedInstancesOfferingResult:
209
        return PurchaseReservedInstancesOfferingResult(
1✔
210
            ReservedInstancesId=long_uid(),
211
        )
212

213
    @handler("ModifyVpcEndpoint")
1✔
214
    def modify_vpc_endpoint(
1✔
215
        self,
216
        context: RequestContext,
217
        vpc_endpoint_id: VpcEndpointId,
218
        dry_run: Boolean = None,
219
        reset_policy: Boolean = None,
220
        policy_document: String = None,
221
        add_route_table_ids: VpcEndpointRouteTableIdList = None,
222
        remove_route_table_ids: VpcEndpointRouteTableIdList = None,
223
        add_subnet_ids: VpcEndpointSubnetIdList = None,
224
        remove_subnet_ids: VpcEndpointSubnetIdList = None,
225
        add_security_group_ids: VpcEndpointSecurityGroupIdList = None,
226
        remove_security_group_ids: VpcEndpointSecurityGroupIdList = None,
227
        ip_address_type: IpAddressType = None,
228
        dns_options: DnsOptionsSpecification = None,
229
        private_dns_enabled: Boolean = None,
230
        subnet_configurations: SubnetConfigurationsList = None,
231
        **kwargs,
232
    ) -> ModifyVpcEndpointResult:
233
        backend = get_ec2_backend(context.account_id, context.region)
×
234

235
        vpc_endpoint = backend.vpc_end_points.get(vpc_endpoint_id)
×
236
        if not vpc_endpoint:
×
237
            raise InvalidVpcEndPointIdError(vpc_endpoint_id)
×
238

239
        if policy_document is not None:
×
240
            vpc_endpoint.policy_document = policy_document
×
241

242
        if add_route_table_ids is not None:
×
243
            vpc_endpoint.route_table_ids.extend(add_route_table_ids)
×
244

245
        if remove_route_table_ids is not None:
×
246
            vpc_endpoint.route_table_ids = [
×
247
                id_ for id_ in vpc_endpoint.route_table_ids if id_ not in remove_route_table_ids
248
            ]
249

250
        if add_subnet_ids is not None:
×
251
            vpc_endpoint.subnet_ids.extend(add_subnet_ids)
×
252

253
        if remove_subnet_ids is not None:
×
254
            vpc_endpoint.subnet_ids = [
×
255
                id_ for id_ in vpc_endpoint.subnet_ids if id_ not in remove_subnet_ids
256
            ]
257

258
        if private_dns_enabled is not None:
×
259
            vpc_endpoint.private_dns_enabled = private_dns_enabled
×
260

261
        return ModifyVpcEndpointResult(Return=True)
×
262

263
    @handler("ModifySubnetAttribute", expand=False)
1✔
264
    def modify_subnet_attribute(
1✔
265
        self, context: RequestContext, request: ModifySubnetAttributeRequest
266
    ) -> None:
267
        try:
1✔
268
            return call_moto(context)
1✔
269
        except Exception as e:
×
270
            if not isinstance(e, ResponseParserError) and "InvalidParameterValue" not in str(e):
×
271
                raise
×
272

273
            backend = get_ec2_backend(context.account_id, context.region)
×
274

275
            # fix setting subnet attributes currently not supported upstream
276
            subnet_id = request["SubnetId"]
×
277
            host_type = request.get("PrivateDnsHostnameTypeOnLaunch")
×
278
            a_record_on_launch = request.get("EnableResourceNameDnsARecordOnLaunch")
×
279
            aaaa_record_on_launch = request.get("EnableResourceNameDnsAAAARecordOnLaunch")
×
280
            enable_dns64 = request.get("EnableDns64")
×
281

282
            if host_type:
×
283
                attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
×
284
                value = {"HostnameType": host_type}
×
285
                backend.modify_subnet_attribute(subnet_id, attr_name, value)
×
286
            ## explicitly checking None value as this could contain a False value
287
            if aaaa_record_on_launch is not None:
×
288
                attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
×
289
                value = {"EnableResourceNameDnsAAAARecord": aaaa_record_on_launch["Value"]}
×
290
                backend.modify_subnet_attribute(subnet_id, attr_name, value)
×
291
            if a_record_on_launch is not None:
×
292
                attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
×
293
                value = {"EnableResourceNameDnsARecord": a_record_on_launch["Value"]}
×
294
                backend.modify_subnet_attribute(subnet_id, attr_name, value)
×
295
            if enable_dns64 is not None:
×
296
                attr_name = camelcase_to_underscores("EnableDns64")
×
297
                backend.modify_subnet_attribute(subnet_id, attr_name, enable_dns64["Value"])
×
298

299
    @handler("CreateSubnet", expand=False)
1✔
300
    def create_subnet(
1✔
301
        self, context: RequestContext, request: CreateSubnetRequest
302
    ) -> CreateSubnetResult:
303
        response = call_moto(context)
1✔
304
        backend = get_ec2_backend(context.account_id, context.region)
1✔
305
        subnet_id = response["Subnet"]["SubnetId"]
1✔
306
        host_type = request.get("PrivateDnsHostnameTypeOnLaunch", "ip-name")
1✔
307
        attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
1✔
308
        value = {"HostnameType": host_type}
1✔
309
        backend.modify_subnet_attribute(subnet_id, attr_name, value)
1✔
310
        return response
1✔
311

312
    @handler("RevokeSecurityGroupEgress", expand=False)
1✔
313
    def revoke_security_group_egress(
1✔
314
        self,
315
        context: RequestContext,
316
        revoke_security_group_egress_request: RevokeSecurityGroupEgressRequest,
317
    ) -> RevokeSecurityGroupEgressResult:
318
        try:
×
319
            return call_moto(context)
×
320
        except Exception as e:
×
321
            if "specified rule does not exist" in str(e):
×
322
                backend = get_ec2_backend(context.account_id, context.region)
×
323
                group_id = revoke_security_group_egress_request["GroupId"]
×
324
                group = backend.get_security_group_by_name_or_id(group_id)
×
325
                if group and not group.egress_rules:
×
326
                    return RevokeSecurityGroupEgressResult(Return=True)
×
327
            raise
×
328

329
    @handler("DescribeSubnets", expand=False)
1✔
330
    def describe_subnets(
1✔
331
        self,
332
        context: RequestContext,
333
        request: DescribeSubnetsRequest,
334
    ) -> DescribeSubnetsResult:
335
        result = call_moto(context)
1✔
336
        backend = get_ec2_backend(context.account_id, context.region)
1✔
337
        # add additional/missing attributes in subnet responses
338
        for subnet in result.get("Subnets", []):
1✔
339
            subnet_obj = backend.subnets[subnet["AvailabilityZone"]].get(subnet["SubnetId"])
1✔
340
            for attr in ADDITIONAL_SUBNET_ATTRS:
1✔
341
                if hasattr(subnet_obj, attr):
1✔
342
                    attr_name = first_char_to_upper(underscores_to_camelcase(attr))
1✔
343
                    if attr_name not in subnet:
1✔
344
                        subnet[attr_name] = getattr(subnet_obj, attr)
×
345
        return result
1✔
346

347
    @handler("CreateTransitGateway", expand=False)
1✔
348
    def create_transit_gateway(
1✔
349
        self,
350
        context: RequestContext,
351
        request: CreateTransitGatewayRequest,
352
    ) -> CreateTransitGatewayResult:
353
        result = call_moto(context)
1✔
354
        backend = get_ec2_backend(context.account_id, context.region)
1✔
355
        transit_gateway_id = result["TransitGateway"]["TransitGatewayId"]
1✔
356
        transit_gateway = backend.transit_gateways.get(transit_gateway_id)
1✔
357
        result.get("TransitGateway").get("Options").update(transit_gateway.options)
1✔
358
        return result
1✔
359

360
    @handler("DescribeTransitGateways", expand=False)
1✔
361
    def describe_transit_gateways(
1✔
362
        self,
363
        context: RequestContext,
364
        request: DescribeTransitGatewaysRequest,
365
    ) -> DescribeTransitGatewaysResult:
366
        result = call_moto(context)
1✔
367
        backend = get_ec2_backend(context.account_id, context.region)
1✔
368
        for transit_gateway in result.get("TransitGateways", []):
1✔
369
            transit_gateway_id = transit_gateway["TransitGatewayId"]
1✔
370
            tgw = backend.transit_gateways.get(transit_gateway_id)
1✔
371
            transit_gateway["Options"].update(tgw.options)
1✔
372
        return result
1✔
373

374
    @handler("CreateLaunchTemplate", expand=False)
1✔
375
    def create_launch_template(
1✔
376
        self,
377
        context: RequestContext,
378
        request: CreateLaunchTemplateRequest,
379
    ) -> CreateLaunchTemplateResult:
380
        # parameter validation
381
        if not request["LaunchTemplateData"]:
1✔
382
            raise MissingParameterError(parameter="LaunchTemplateData")
1✔
383

384
        name = request["LaunchTemplateName"]
1✔
385
        if len(name) < 3 or len(name) > 128 or not re.fullmatch(r"[a-zA-Z0-9.\-_()/]*", name):
1✔
386
            raise InvalidLaunchTemplateNameError()
1✔
387

388
        return call_moto(context)
1✔
389

390
    @handler("ModifyLaunchTemplate", expand=False)
1✔
391
    def modify_launch_template(
1✔
392
        self,
393
        context: RequestContext,
394
        request: ModifyLaunchTemplateRequest,
395
    ) -> ModifyLaunchTemplateResult:
396
        backend = get_ec2_backend(context.account_id, context.region)
1✔
397
        template_id = (
1✔
398
            request["LaunchTemplateId"]
399
            or backend.launch_template_name_to_ids[request["LaunchTemplateName"]]
400
        )
401
        template: MotoLaunchTemplate = backend.launch_templates[template_id]
1✔
402

403
        # check if defaultVersion exists
404
        if request["DefaultVersion"]:
1✔
405
            try:
1✔
406
                template.versions[int(request["DefaultVersion"]) - 1]
1✔
407
            except IndexError:
1✔
408
                raise InvalidLaunchTemplateIdError()
1✔
409

410
        template.default_version_number = int(request["DefaultVersion"])
1✔
411

412
        return ModifyLaunchTemplateResult(
1✔
413
            LaunchTemplate=LaunchTemplate(
414
                LaunchTemplateId=template.id,
415
                LaunchTemplateName=template.name,
416
                CreateTime=template.create_time,
417
                DefaultVersionNumber=template.default_version_number,
418
                LatestVersionNumber=template.latest_version_number,
419
                Tags=template.tags,
420
            )
421
        )
422

423
    @handler("DescribeVpcEndpointServices", expand=False)
1✔
424
    def describe_vpc_endpoint_services(
1✔
425
        self,
426
        context: RequestContext,
427
        request: DescribeVpcEndpointServicesRequest,
428
    ) -> DescribeVpcEndpointServicesResult:
429
        ep_services = VPCBackend._collect_default_endpoint_services(
1✔
430
            account_id=context.account_id, region=context.region
431
        )
432

433
        moto_backend = get_moto_backend(context)
1✔
434
        service_names = [s["ServiceName"] for s in ep_services]
1✔
435
        execute_api_name = f"com.amazonaws.{context.region}.execute-api"
1✔
436

437
        if execute_api_name not in service_names:
1✔
438
            # ensure that the service entry for execute-api exists
439
            zones = moto_backend.describe_availability_zones()
×
440
            zones = [zone.name for zone in zones]
×
441
            private_dns_name = f"*.execute-api.{context.region}.amazonaws.com"
×
442
            service = {
×
443
                "ServiceName": execute_api_name,
444
                "ServiceId": f"vpce-svc-{short_uid()}",
445
                "ServiceType": [{"ServiceType": "Interface"}],
446
                "AvailabilityZones": zones,
447
                "Owner": "amazon",
448
                "BaseEndpointDnsNames": [f"execute-api.{context.region}.vpce.amazonaws.com"],
449
                "PrivateDnsName": private_dns_name,
450
                "PrivateDnsNames": [{"PrivateDnsName": private_dns_name}],
451
                "VpcEndpointPolicySupported": True,
452
                "AcceptanceRequired": False,
453
                "ManagesVpcEndpoints": False,
454
                "PrivateDnsNameVerificationState": "verified",
455
                "SupportedIpAddressTypes": ["ipv4"],
456
            }
457
            ep_services.append(service)
×
458

459
        return call_moto(context)
1✔
460

461
    @handler("DescribeVpcEndpoints", expand=False)
1✔
462
    def describe_vpc_endpoints(
1✔
463
        self,
464
        context: RequestContext,
465
        request: DescribeVpcEndpointsRequest,
466
    ) -> DescribeVpcEndpointsResult:
467
        result: DescribeVpcEndpointsResult = call_moto(context)
1✔
468

469
        for endpoint in result.get("VpcEndpoints"):
1✔
470
            endpoint.setdefault("DnsOptions", DnsOptions(DnsRecordIpType=DnsRecordIpType.ipv4))
1✔
471
            endpoint.setdefault("IpAddressType", IpAddressType.ipv4)
1✔
472
            endpoint.setdefault("RequesterManaged", False)
1✔
473
            endpoint.setdefault("RouteTableIds", [])
1✔
474
            # AWS parity: Version should not be contained in the policy response
475
            policy = endpoint.get("PolicyDocument")
1✔
476
            if policy and '"Version":' in policy:
1✔
477
                policy = json.loads(policy)
1✔
478
                policy.pop("Version", None)
1✔
479
                endpoint["PolicyDocument"] = json.dumps(policy)
1✔
480

481
        return result
1✔
482

483
    @handler("CreateFlowLogs", expand=False)
1✔
484
    def create_flow_logs(
1✔
485
        self,
486
        context: RequestContext,
487
        request: CreateFlowLogsRequest,
488
        **kwargs,
489
    ) -> CreateFlowLogsResult:
490
        if request.get("LogDestination") and request.get("LogGroupName"):
1✔
491
            raise CommonServiceException(
1✔
492
                code="InvalidParameter",
493
                message="Please only provide LogGroupName or only provide LogDestination.",
494
            )
495
        if request.get("LogDestinationType") == "s3":
1✔
496
            if request.get("LogGroupName"):
1✔
497
                raise CommonServiceException(
1✔
498
                    code="InvalidParameter",
499
                    message="LogDestination type must be cloud-watch-logs if LogGroupName is provided.",
500
                )
501
            elif not (bucket_arn := request.get("LogDestination")):
1✔
502
                raise CommonServiceException(
1✔
503
                    code="InvalidParameter",
504
                    message="LogDestination can't be empty if LogGroupName is not provided.",
505
                )
506

507
            # Moto will check in memory whether the bucket exists in Moto itself
508
            # we modify the request to not send a destination, so that the validation does not happen
509
            # we can add the validation ourselves
510
            service_request = copy.deepcopy(request)
1✔
511
            service_request["LogDestinationType"] = "__placeholder__"
1✔
512
            bucket_name = bucket_arn.split(":", 5)[5].split("/")[0]
1✔
513
            # TODO: validate how IAM is enforced? probably with DeliverLogsPermissionArn
514
            s3_client = connect_to().s3
1✔
515
            try:
1✔
516
                s3_client.head_bucket(Bucket=bucket_name)
1✔
517
            except Exception as e:
1✔
518
                LOG.debug(
1✔
519
                    "An exception occurred when trying to create FlowLogs with S3 destination: %s",
520
                    e,
521
                )
522
                return CreateFlowLogsResult(
1✔
523
                    FlowLogIds=[],
524
                    Unsuccessful=[
525
                        UnsuccessfulItem(
526
                            Error=UnsuccessfulItemError(
527
                                Code="400",
528
                                Message=f"LogDestination: {bucket_name} does not exist",
529
                            ),
530
                            ResourceId=resource_id,
531
                        )
532
                        for resource_id in request.get("ResourceIds", [])
533
                    ],
534
                )
535

536
            response: CreateFlowLogsResult = call_moto_with_request(context, service_request)
1✔
537
            moto_backend = get_moto_backend(context)
1✔
538
            for flow_log_id in response["FlowLogIds"]:
1✔
539
                if flow_log := moto_backend.flow_logs.get(flow_log_id):
1✔
540
                    # just to be sure to not override another value, we only replace if it's the placeholder
541
                    flow_log.log_destination_type = flow_log.log_destination_type.replace(
1✔
542
                        "__placeholder__", "s3"
543
                    )
544
        else:
545
            response = call_moto(context)
×
546

547
        return response
1✔
548

549
    @handler("GetSecurityGroupsForVpc", expand=False)
1✔
550
    def get_security_groups_for_vpc(
1✔
551
        self,
552
        context: RequestContext,
553
        get_security_groups_for_vpc_request: GetSecurityGroupsForVpcRequest,
554
    ) -> GetSecurityGroupsForVpcResult:
555
        vpc_id = get_security_groups_for_vpc_request.get("VpcId")
1✔
556
        backend = get_ec2_backend(context.account_id, context.region)
1✔
557
        filters = {"vpc-id": [vpc_id]}
1✔
558
        filtered_sgs = backend.describe_security_groups(filters=filters)
1✔
559

560
        sgs = [
1✔
561
            SecurityGroupForVpc(
562
                Description=sg.description,
563
                GroupId=sg.id,
564
                GroupName=sg.name,
565
                OwnerId=context.account_id,
566
                PrimaryVpcId=sg.vpc_id,
567
                Tags=[{"Key": tag.get("key"), "Value": tag.get("value")} for tag in sg.get_tags()],
568
            )
569
            for sg in filtered_sgs
570
        ]
571
        return GetSecurityGroupsForVpcResult(SecurityGroupForVpcs=sgs, NextToken=None)
1✔
572

573

574
@patch(SubnetBackend.modify_subnet_attribute)
1✔
575
def modify_subnet_attribute(fn, self, subnet_id, attr_name, attr_value):
1✔
576
    subnet = self.get_subnet(subnet_id)
1✔
577
    if attr_name in ADDITIONAL_SUBNET_ATTRS:
1✔
578
        # private dns name options on launch contains dict with keys EnableResourceNameDnsARecord and EnableResourceNameDnsAAAARecord, HostnameType
579
        if attr_name == "private_dns_name_options_on_launch":
1✔
580
            if hasattr(subnet, attr_name):
1✔
581
                getattr(subnet, attr_name).update(attr_value)
×
582
                return
×
583
            else:
584
                setattr(subnet, attr_name, attr_value)
1✔
585
                return
1✔
586
        setattr(subnet, attr_name, attr_value)
×
587
        return
×
588
    return fn(self, subnet_id, attr_name, attr_value)
1✔
589

590

591
def get_moto_backend(context: RequestContext) -> EC2Backend:
1✔
592
    """Get the moto EC2 backend for the given request context"""
593
    return ec2_backends[context.account_id][context.region]
1✔
594

595

596
@patch(Subnet.get_filter_value)
1✔
597
def get_filter_value(fn, self, filter_name):
1✔
598
    if filter_name in (
1✔
599
        "ipv6CidrBlockAssociationSet.associationId",
600
        "ipv6-cidr-block-association.association-id",
601
    ):
602
        return self.ipv6_cidr_block_associations
×
603
    return fn(self, filter_name)
1✔
604

605

606
@patch(TransitGatewayAttachmentBackend.delete_transit_gateway_vpc_attachment)
1✔
607
def delete_transit_gateway_vpc_attachment(fn, self, transit_gateway_attachment_id, **kwargs):
1✔
608
    transit_gateway_attachment = self.transit_gateway_attachments.get(transit_gateway_attachment_id)
1✔
609
    transit_gateway_attachment.state = "deleted"
1✔
610
    return transit_gateway_attachment
1✔
611

612

613
@patch(FlowLogsBackend._validate_request)
1✔
614
def _validate_request(
1✔
615
    fn,
616
    self,
617
    log_group_name: str,
618
    log_destination: str,
619
    log_destination_type: str,
620
    max_aggregation_interval: str,
621
    deliver_logs_permission_arn: str,
622
) -> None:
623
    if not log_destination_type and log_destination:
1✔
624
        # this is to fix the S3 destination issue, the validation will occur in the provider
625
        return
×
626

627
    fn(
1✔
628
        self,
629
        log_group_name,
630
        log_destination,
631
        log_destination_type,
632
        max_aggregation_interval,
633
        deliver_logs_permission_arn,
634
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc