• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17265699519

27 Aug 2025 11:28AM UTC coverage: 86.827% (-0.01%) from 86.837%
17265699519

push

github

web-flow
Fix SQS tests failing due to missing snapshot update after #12957 (#13062)

67057 of 77231 relevant lines covered (86.83%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.8
/localstack-core/localstack/services/ec2/provider.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
from abc import ABC
1✔
6
from datetime import UTC, datetime
1✔
7

8
from botocore.parsers import ResponseParserError
1✔
9
from moto.core.utils import camelcase_to_underscores, underscores_to_camelcase
1✔
10
from moto.ec2.exceptions import InvalidVpcEndPointIdError
1✔
11
from moto.ec2.models import (
1✔
12
    EC2Backend,
13
    FlowLogsBackend,
14
    SubnetBackend,
15
    TransitGatewayAttachmentBackend,
16
    VPCBackend,
17
    ec2_backends,
18
)
19
from moto.ec2.models.launch_templates import LaunchTemplate as MotoLaunchTemplate
1✔
20
from moto.ec2.models.subnets import Subnet
1✔
21

22
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
23
from localstack.aws.api.ec2 import (
1✔
24
    AvailabilityZone,
25
    Boolean,
26
    CreateFlowLogsRequest,
27
    CreateFlowLogsResult,
28
    CreateLaunchTemplateRequest,
29
    CreateLaunchTemplateResult,
30
    CreateSubnetRequest,
31
    CreateSubnetResult,
32
    CreateTransitGatewayRequest,
33
    CreateTransitGatewayResult,
34
    CurrencyCodeValues,
35
    DescribeAvailabilityZonesRequest,
36
    DescribeAvailabilityZonesResult,
37
    DescribeReservedInstancesOfferingsRequest,
38
    DescribeReservedInstancesOfferingsResult,
39
    DescribeReservedInstancesRequest,
40
    DescribeReservedInstancesResult,
41
    DescribeSubnetsRequest,
42
    DescribeSubnetsResult,
43
    DescribeTransitGatewaysRequest,
44
    DescribeTransitGatewaysResult,
45
    DescribeVpcEndpointServicesRequest,
46
    DescribeVpcEndpointServicesResult,
47
    DescribeVpcEndpointsRequest,
48
    DescribeVpcEndpointsResult,
49
    DnsOptions,
50
    DnsOptionsSpecification,
51
    DnsRecordIpType,
52
    Ec2Api,
53
    GetSecurityGroupsForVpcRequest,
54
    GetSecurityGroupsForVpcResult,
55
    InstanceType,
56
    IpAddressType,
57
    LaunchTemplate,
58
    ModifyLaunchTemplateRequest,
59
    ModifyLaunchTemplateResult,
60
    ModifySubnetAttributeRequest,
61
    ModifyVpcEndpointResult,
62
    OfferingClassType,
63
    OfferingTypeValues,
64
    PricingDetail,
65
    PurchaseReservedInstancesOfferingRequest,
66
    PurchaseReservedInstancesOfferingResult,
67
    RecurringCharge,
68
    RecurringChargeFrequency,
69
    ReservedInstances,
70
    ReservedInstancesOffering,
71
    ReservedInstanceState,
72
    RevokeSecurityGroupEgressRequest,
73
    RevokeSecurityGroupEgressResult,
74
    RIProductDescription,
75
    SecurityGroupForVpc,
76
    String,
77
    SubnetConfigurationsList,
78
    Tenancy,
79
    UnsuccessfulItem,
80
    UnsuccessfulItemError,
81
    VpcEndpointId,
82
    VpcEndpointRouteTableIdList,
83
    VpcEndpointSecurityGroupIdList,
84
    VpcEndpointSubnetIdList,
85
    scope,
86
)
87
from localstack.aws.connect import connect_to
1✔
88
from localstack.services.ec2.exceptions import (
1✔
89
    InvalidLaunchTemplateIdError,
90
    InvalidLaunchTemplateNameError,
91
    MissingParameterError,
92
)
93
from localstack.services.ec2.models import get_ec2_backend
1✔
94
from localstack.services.ec2.patches import apply_patches
1✔
95
from localstack.services.moto import call_moto, call_moto_with_request
1✔
96
from localstack.services.plugins import ServiceLifecycleHook
1✔
97
from localstack.utils.patch import patch
1✔
98
from localstack.utils.strings import first_char_to_upper, long_uid, short_uid
1✔
99

100
LOG = logging.getLogger(__name__)
1✔
101

102
# additional subnet attributes not yet supported upstream
103
ADDITIONAL_SUBNET_ATTRS = ("private_dns_name_options_on_launch", "enable_dns64")
1✔
104

105

106
class Ec2Provider(Ec2Api, ABC, ServiceLifecycleHook):
1✔
107
    def on_after_init(self):
1✔
108
        apply_patches()
1✔
109

110
    @handler("DescribeAvailabilityZones", expand=False)
1✔
111
    def describe_availability_zones(
1✔
112
        self,
113
        context: RequestContext,
114
        describe_availability_zones_request: DescribeAvailabilityZonesRequest,
115
    ) -> DescribeAvailabilityZonesResult:
116
        backend = get_ec2_backend(context.account_id, context.region)
1✔
117
        zone_names = describe_availability_zones_request.get("ZoneNames")
1✔
118
        zone_ids = describe_availability_zones_request.get("ZoneIds")
1✔
119
        if zone_names or zone_ids:
1✔
120
            filtered_zones = backend.describe_availability_zones(
1✔
121
                zone_names=zone_names, zone_ids=zone_ids
122
            )
123
            availability_zones = [
1✔
124
                AvailabilityZone(
125
                    State="available",
126
                    Messages=[],
127
                    RegionName=zone.region_name,
128
                    ZoneName=zone.name,
129
                    ZoneId=zone.zone_id,
130
                    ZoneType=zone.zone_type,
131
                )
132
                for zone in filtered_zones
133
            ]
134
            return DescribeAvailabilityZonesResult(AvailabilityZones=availability_zones)
1✔
135
        return call_moto(context)
1✔
136

137
    @handler("DescribeReservedInstancesOfferings", expand=False)
1✔
138
    def describe_reserved_instances_offerings(
1✔
139
        self,
140
        context: RequestContext,
141
        describe_reserved_instances_offerings_request: DescribeReservedInstancesOfferingsRequest,
142
    ) -> DescribeReservedInstancesOfferingsResult:
143
        return DescribeReservedInstancesOfferingsResult(
1✔
144
            ReservedInstancesOfferings=[
145
                ReservedInstancesOffering(
146
                    AvailabilityZone="eu-central-1a",
147
                    Duration=2628000,
148
                    FixedPrice=0.0,
149
                    InstanceType=InstanceType.t2_small,
150
                    ProductDescription=RIProductDescription.Linux_UNIX,
151
                    ReservedInstancesOfferingId=long_uid(),
152
                    UsagePrice=0.0,
153
                    CurrencyCode=CurrencyCodeValues.USD,
154
                    InstanceTenancy=Tenancy.default,
155
                    Marketplace=True,
156
                    PricingDetails=[PricingDetail(Price=0.0, Count=3)],
157
                    RecurringCharges=[
158
                        RecurringCharge(Amount=0.25, Frequency=RecurringChargeFrequency.Hourly)
159
                    ],
160
                    Scope=scope.Availability_Zone,
161
                )
162
            ]
163
        )
164

165
    @handler("DescribeReservedInstances", expand=False)
1✔
166
    def describe_reserved_instances(
1✔
167
        self,
168
        context: RequestContext,
169
        describe_reserved_instances_request: DescribeReservedInstancesRequest,
170
    ) -> DescribeReservedInstancesResult:
171
        return DescribeReservedInstancesResult(
1✔
172
            ReservedInstances=[
173
                ReservedInstances(
174
                    AvailabilityZone="eu-central-1a",
175
                    Duration=2628000,
176
                    End=datetime(2016, 6, 30, tzinfo=UTC),
177
                    FixedPrice=0.0,
178
                    InstanceCount=2,
179
                    InstanceType=InstanceType.t2_small,
180
                    ProductDescription=RIProductDescription.Linux_UNIX,
181
                    ReservedInstancesId=long_uid(),
182
                    Start=datetime(2016, 1, 1, tzinfo=UTC),
183
                    State=ReservedInstanceState.active,
184
                    UsagePrice=0.05,
185
                    CurrencyCode=CurrencyCodeValues.USD,
186
                    InstanceTenancy=Tenancy.default,
187
                    OfferingClass=OfferingClassType.standard,
188
                    OfferingType=OfferingTypeValues.Partial_Upfront,
189
                    RecurringCharges=[
190
                        RecurringCharge(Amount=0.05, Frequency=RecurringChargeFrequency.Hourly)
191
                    ],
192
                    Scope=scope.Availability_Zone,
193
                )
194
            ]
195
        )
196

197
    @handler("PurchaseReservedInstancesOffering", expand=False)
1✔
198
    def purchase_reserved_instances_offering(
1✔
199
        self,
200
        context: RequestContext,
201
        purchase_reserved_instances_offerings_request: PurchaseReservedInstancesOfferingRequest,
202
    ) -> PurchaseReservedInstancesOfferingResult:
203
        return PurchaseReservedInstancesOfferingResult(
1✔
204
            ReservedInstancesId=long_uid(),
205
        )
206

207
    @handler("ModifyVpcEndpoint")
1✔
208
    def modify_vpc_endpoint(
1✔
209
        self,
210
        context: RequestContext,
211
        vpc_endpoint_id: VpcEndpointId,
212
        dry_run: Boolean = None,
213
        reset_policy: Boolean = None,
214
        policy_document: String = None,
215
        add_route_table_ids: VpcEndpointRouteTableIdList = None,
216
        remove_route_table_ids: VpcEndpointRouteTableIdList = None,
217
        add_subnet_ids: VpcEndpointSubnetIdList = None,
218
        remove_subnet_ids: VpcEndpointSubnetIdList = None,
219
        add_security_group_ids: VpcEndpointSecurityGroupIdList = None,
220
        remove_security_group_ids: VpcEndpointSecurityGroupIdList = None,
221
        ip_address_type: IpAddressType = None,
222
        dns_options: DnsOptionsSpecification = None,
223
        private_dns_enabled: Boolean = None,
224
        subnet_configurations: SubnetConfigurationsList = None,
225
        **kwargs,
226
    ) -> ModifyVpcEndpointResult:
227
        backend = get_ec2_backend(context.account_id, context.region)
×
228

229
        vpc_endpoint = backend.vpc_end_points.get(vpc_endpoint_id)
×
230
        if not vpc_endpoint:
×
231
            raise InvalidVpcEndPointIdError(vpc_endpoint_id)
×
232

233
        if policy_document is not None:
×
234
            vpc_endpoint.policy_document = policy_document
×
235

236
        if add_route_table_ids is not None:
×
237
            vpc_endpoint.route_table_ids.extend(add_route_table_ids)
×
238

239
        if remove_route_table_ids is not None:
×
240
            vpc_endpoint.route_table_ids = [
×
241
                id_ for id_ in vpc_endpoint.route_table_ids if id_ not in remove_route_table_ids
242
            ]
243

244
        if add_subnet_ids is not None:
×
245
            vpc_endpoint.subnet_ids.extend(add_subnet_ids)
×
246

247
        if remove_subnet_ids is not None:
×
248
            vpc_endpoint.subnet_ids = [
×
249
                id_ for id_ in vpc_endpoint.subnet_ids if id_ not in remove_subnet_ids
250
            ]
251

252
        if private_dns_enabled is not None:
×
253
            vpc_endpoint.private_dns_enabled = private_dns_enabled
×
254

255
        return ModifyVpcEndpointResult(Return=True)
×
256

257
    @handler("ModifySubnetAttribute", expand=False)
1✔
258
    def modify_subnet_attribute(
1✔
259
        self, context: RequestContext, request: ModifySubnetAttributeRequest
260
    ) -> None:
261
        try:
1✔
262
            return call_moto(context)
1✔
263
        except Exception as e:
×
264
            if not isinstance(e, ResponseParserError) and "InvalidParameterValue" not in str(e):
×
265
                raise
×
266

267
            backend = get_ec2_backend(context.account_id, context.region)
×
268

269
            # fix setting subnet attributes currently not supported upstream
270
            subnet_id = request["SubnetId"]
×
271
            host_type = request.get("PrivateDnsHostnameTypeOnLaunch")
×
272
            a_record_on_launch = request.get("EnableResourceNameDnsARecordOnLaunch")
×
273
            aaaa_record_on_launch = request.get("EnableResourceNameDnsAAAARecordOnLaunch")
×
274
            enable_dns64 = request.get("EnableDns64")
×
275

276
            if host_type:
×
277
                attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
×
278
                value = {"HostnameType": host_type}
×
279
                backend.modify_subnet_attribute(subnet_id, attr_name, value)
×
280
            ## explicitly checking None value as this could contain a False value
281
            if aaaa_record_on_launch is not None:
×
282
                attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
×
283
                value = {"EnableResourceNameDnsAAAARecord": aaaa_record_on_launch["Value"]}
×
284
                backend.modify_subnet_attribute(subnet_id, attr_name, value)
×
285
            if a_record_on_launch is not None:
×
286
                attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
×
287
                value = {"EnableResourceNameDnsARecord": a_record_on_launch["Value"]}
×
288
                backend.modify_subnet_attribute(subnet_id, attr_name, value)
×
289
            if enable_dns64 is not None:
×
290
                attr_name = camelcase_to_underscores("EnableDns64")
×
291
                backend.modify_subnet_attribute(subnet_id, attr_name, enable_dns64["Value"])
×
292

293
    @handler("CreateSubnet", expand=False)
1✔
294
    def create_subnet(
1✔
295
        self, context: RequestContext, request: CreateSubnetRequest
296
    ) -> CreateSubnetResult:
297
        response = call_moto(context)
1✔
298
        backend = get_ec2_backend(context.account_id, context.region)
1✔
299
        subnet_id = response["Subnet"]["SubnetId"]
1✔
300
        host_type = request.get("PrivateDnsHostnameTypeOnLaunch", "ip-name")
1✔
301
        attr_name = camelcase_to_underscores("PrivateDnsNameOptionsOnLaunch")
1✔
302
        value = {"HostnameType": host_type}
1✔
303
        backend.modify_subnet_attribute(subnet_id, attr_name, value)
1✔
304
        return response
1✔
305

306
    @handler("RevokeSecurityGroupEgress", expand=False)
1✔
307
    def revoke_security_group_egress(
1✔
308
        self,
309
        context: RequestContext,
310
        revoke_security_group_egress_request: RevokeSecurityGroupEgressRequest,
311
    ) -> RevokeSecurityGroupEgressResult:
312
        try:
×
313
            return call_moto(context)
×
314
        except Exception as e:
×
315
            if "specified rule does not exist" in str(e):
×
316
                backend = get_ec2_backend(context.account_id, context.region)
×
317
                group_id = revoke_security_group_egress_request["GroupId"]
×
318
                group = backend.get_security_group_by_name_or_id(group_id)
×
319
                if group and not group.egress_rules:
×
320
                    return RevokeSecurityGroupEgressResult(Return=True)
×
321
            raise
×
322

323
    @handler("DescribeSubnets", expand=False)
1✔
324
    def describe_subnets(
1✔
325
        self,
326
        context: RequestContext,
327
        request: DescribeSubnetsRequest,
328
    ) -> DescribeSubnetsResult:
329
        result = call_moto(context)
1✔
330
        backend = get_ec2_backend(context.account_id, context.region)
1✔
331
        # add additional/missing attributes in subnet responses
332
        for subnet in result.get("Subnets", []):
1✔
333
            subnet_obj = backend.subnets[subnet["AvailabilityZone"]].get(subnet["SubnetId"])
1✔
334
            for attr in ADDITIONAL_SUBNET_ATTRS:
1✔
335
                if hasattr(subnet_obj, attr):
1✔
336
                    attr_name = first_char_to_upper(underscores_to_camelcase(attr))
1✔
337
                    if attr_name not in subnet:
1✔
338
                        subnet[attr_name] = getattr(subnet_obj, attr)
×
339
        return result
1✔
340

341
    @handler("CreateTransitGateway", expand=False)
1✔
342
    def create_transit_gateway(
1✔
343
        self,
344
        context: RequestContext,
345
        request: CreateTransitGatewayRequest,
346
    ) -> CreateTransitGatewayResult:
347
        result = call_moto(context)
1✔
348
        backend = get_ec2_backend(context.account_id, context.region)
1✔
349
        transit_gateway_id = result["TransitGateway"]["TransitGatewayId"]
1✔
350
        transit_gateway = backend.transit_gateways.get(transit_gateway_id)
1✔
351
        result.get("TransitGateway").get("Options").update(transit_gateway.options)
1✔
352
        return result
1✔
353

354
    @handler("DescribeTransitGateways", expand=False)
1✔
355
    def describe_transit_gateways(
1✔
356
        self,
357
        context: RequestContext,
358
        request: DescribeTransitGatewaysRequest,
359
    ) -> DescribeTransitGatewaysResult:
360
        result = call_moto(context)
1✔
361
        backend = get_ec2_backend(context.account_id, context.region)
1✔
362
        for transit_gateway in result.get("TransitGateways", []):
1✔
363
            transit_gateway_id = transit_gateway["TransitGatewayId"]
1✔
364
            tgw = backend.transit_gateways.get(transit_gateway_id)
1✔
365
            transit_gateway["Options"].update(tgw.options)
1✔
366
        return result
1✔
367

368
    @handler("CreateLaunchTemplate", expand=False)
1✔
369
    def create_launch_template(
1✔
370
        self,
371
        context: RequestContext,
372
        request: CreateLaunchTemplateRequest,
373
    ) -> CreateLaunchTemplateResult:
374
        # parameter validation
375
        if not request["LaunchTemplateData"]:
1✔
376
            raise MissingParameterError(parameter="LaunchTemplateData")
1✔
377

378
        name = request["LaunchTemplateName"]
1✔
379
        if len(name) < 3 or len(name) > 128 or not re.fullmatch(r"[a-zA-Z0-9.\-_()/]*", name):
1✔
380
            raise InvalidLaunchTemplateNameError()
1✔
381

382
        return call_moto(context)
1✔
383

384
    @handler("ModifyLaunchTemplate", expand=False)
1✔
385
    def modify_launch_template(
1✔
386
        self,
387
        context: RequestContext,
388
        request: ModifyLaunchTemplateRequest,
389
    ) -> ModifyLaunchTemplateResult:
390
        backend = get_ec2_backend(context.account_id, context.region)
1✔
391
        template_id = (
1✔
392
            request["LaunchTemplateId"]
393
            or backend.launch_template_name_to_ids[request["LaunchTemplateName"]]
394
        )
395
        template: MotoLaunchTemplate = backend.launch_templates[template_id]
1✔
396

397
        # check if defaultVersion exists
398
        if request["DefaultVersion"]:
1✔
399
            try:
1✔
400
                template.versions[int(request["DefaultVersion"]) - 1]
1✔
401
            except IndexError:
1✔
402
                raise InvalidLaunchTemplateIdError()
1✔
403

404
        template.default_version_number = int(request["DefaultVersion"])
1✔
405

406
        return ModifyLaunchTemplateResult(
1✔
407
            LaunchTemplate=LaunchTemplate(
408
                LaunchTemplateId=template.id,
409
                LaunchTemplateName=template.name,
410
                CreateTime=template.create_time,
411
                DefaultVersionNumber=template.default_version_number,
412
                LatestVersionNumber=template.latest_version_number,
413
                Tags=template.tags,
414
            )
415
        )
416

417
    @handler("DescribeVpcEndpointServices", expand=False)
1✔
418
    def describe_vpc_endpoint_services(
1✔
419
        self,
420
        context: RequestContext,
421
        request: DescribeVpcEndpointServicesRequest,
422
    ) -> DescribeVpcEndpointServicesResult:
423
        ep_services = VPCBackend._collect_default_endpoint_services(
1✔
424
            account_id=context.account_id, region=context.region
425
        )
426

427
        moto_backend = get_moto_backend(context)
1✔
428
        service_names = [s["ServiceName"] for s in ep_services]
1✔
429
        execute_api_name = f"com.amazonaws.{context.region}.execute-api"
1✔
430

431
        if execute_api_name not in service_names:
1✔
432
            # ensure that the service entry for execute-api exists
433
            zones = moto_backend.describe_availability_zones()
×
434
            zones = [zone.name for zone in zones]
×
435
            private_dns_name = f"*.execute-api.{context.region}.amazonaws.com"
×
436
            service = {
×
437
                "ServiceName": execute_api_name,
438
                "ServiceId": f"vpce-svc-{short_uid()}",
439
                "ServiceType": [{"ServiceType": "Interface"}],
440
                "AvailabilityZones": zones,
441
                "Owner": "amazon",
442
                "BaseEndpointDnsNames": [f"execute-api.{context.region}.vpce.amazonaws.com"],
443
                "PrivateDnsName": private_dns_name,
444
                "PrivateDnsNames": [{"PrivateDnsName": private_dns_name}],
445
                "VpcEndpointPolicySupported": True,
446
                "AcceptanceRequired": False,
447
                "ManagesVpcEndpoints": False,
448
                "PrivateDnsNameVerificationState": "verified",
449
                "SupportedIpAddressTypes": ["ipv4"],
450
            }
451
            ep_services.append(service)
×
452

453
        return call_moto(context)
1✔
454

455
    @handler("DescribeVpcEndpoints", expand=False)
1✔
456
    def describe_vpc_endpoints(
1✔
457
        self,
458
        context: RequestContext,
459
        request: DescribeVpcEndpointsRequest,
460
    ) -> DescribeVpcEndpointsResult:
461
        result: DescribeVpcEndpointsResult = call_moto(context)
1✔
462

463
        for endpoint in result.get("VpcEndpoints"):
1✔
464
            endpoint.setdefault("DnsOptions", DnsOptions(DnsRecordIpType=DnsRecordIpType.ipv4))
1✔
465
            endpoint.setdefault("IpAddressType", IpAddressType.ipv4)
1✔
466
            endpoint.setdefault("RequesterManaged", False)
1✔
467
            endpoint.setdefault("RouteTableIds", [])
1✔
468
            # AWS parity: Version should not be contained in the policy response
469
            policy = endpoint.get("PolicyDocument")
1✔
470
            if policy and '"Version":' in policy:
1✔
471
                policy = json.loads(policy)
1✔
472
                policy.pop("Version", None)
1✔
473
                endpoint["PolicyDocument"] = json.dumps(policy)
1✔
474

475
        return result
1✔
476

477
    @handler("CreateFlowLogs", expand=False)
1✔
478
    def create_flow_logs(
1✔
479
        self,
480
        context: RequestContext,
481
        request: CreateFlowLogsRequest,
482
        **kwargs,
483
    ) -> CreateFlowLogsResult:
484
        if request.get("LogDestination") and request.get("LogGroupName"):
1✔
485
            raise CommonServiceException(
1✔
486
                code="InvalidParameter",
487
                message="Please only provide LogGroupName or only provide LogDestination.",
488
            )
489
        if request.get("LogDestinationType") == "s3":
1✔
490
            if request.get("LogGroupName"):
1✔
491
                raise CommonServiceException(
1✔
492
                    code="InvalidParameter",
493
                    message="LogDestination type must be cloud-watch-logs if LogGroupName is provided.",
494
                )
495
            elif not (bucket_arn := request.get("LogDestination")):
1✔
496
                raise CommonServiceException(
1✔
497
                    code="InvalidParameter",
498
                    message="LogDestination can't be empty if LogGroupName is not provided.",
499
                )
500

501
            # Moto will check in memory whether the bucket exists in Moto itself
502
            # we modify the request to not send a destination, so that the validation does not happen
503
            # we can add the validation ourselves
504
            service_request = copy.deepcopy(request)
1✔
505
            service_request["LogDestinationType"] = "__placeholder__"
1✔
506
            bucket_name = bucket_arn.split(":", 5)[5].split("/")[0]
1✔
507
            # TODO: validate how IAM is enforced? probably with DeliverLogsPermissionArn
508
            s3_client = connect_to().s3
1✔
509
            try:
1✔
510
                s3_client.head_bucket(Bucket=bucket_name)
1✔
511
            except Exception as e:
1✔
512
                LOG.debug(
1✔
513
                    "An exception occurred when trying to create FlowLogs with S3 destination: %s",
514
                    e,
515
                )
516
                return CreateFlowLogsResult(
1✔
517
                    FlowLogIds=[],
518
                    Unsuccessful=[
519
                        UnsuccessfulItem(
520
                            Error=UnsuccessfulItemError(
521
                                Code="400",
522
                                Message=f"LogDestination: {bucket_name} does not exist",
523
                            ),
524
                            ResourceId=resource_id,
525
                        )
526
                        for resource_id in request.get("ResourceIds", [])
527
                    ],
528
                )
529

530
            response: CreateFlowLogsResult = call_moto_with_request(context, service_request)
1✔
531
            moto_backend = get_moto_backend(context)
1✔
532
            for flow_log_id in response["FlowLogIds"]:
1✔
533
                if flow_log := moto_backend.flow_logs.get(flow_log_id):
1✔
534
                    # just to be sure to not override another value, we only replace if it's the placeholder
535
                    flow_log.log_destination_type = flow_log.log_destination_type.replace(
1✔
536
                        "__placeholder__", "s3"
537
                    )
538
        else:
539
            response = call_moto(context)
×
540

541
        return response
1✔
542

543
    @handler("GetSecurityGroupsForVpc", expand=False)
1✔
544
    def get_security_groups_for_vpc(
1✔
545
        self,
546
        context: RequestContext,
547
        get_security_groups_for_vpc_request: GetSecurityGroupsForVpcRequest,
548
    ) -> GetSecurityGroupsForVpcResult:
549
        vpc_id = get_security_groups_for_vpc_request.get("VpcId")
1✔
550
        backend = get_ec2_backend(context.account_id, context.region)
1✔
551
        filters = {"vpc-id": [vpc_id]}
1✔
552
        filtered_sgs = backend.describe_security_groups(filters=filters)
1✔
553

554
        sgs = [
1✔
555
            SecurityGroupForVpc(
556
                Description=sg.description,
557
                GroupId=sg.id,
558
                GroupName=sg.name,
559
                OwnerId=context.account_id,
560
                PrimaryVpcId=sg.vpc_id,
561
                Tags=[{"Key": tag.get("key"), "Value": tag.get("value")} for tag in sg.get_tags()],
562
            )
563
            for sg in filtered_sgs
564
        ]
565
        return GetSecurityGroupsForVpcResult(SecurityGroupForVpcs=sgs, NextToken=None)
1✔
566

567

568
@patch(SubnetBackend.modify_subnet_attribute)
1✔
569
def modify_subnet_attribute(fn, self, subnet_id, attr_name, attr_value):
1✔
570
    subnet = self.get_subnet(subnet_id)
1✔
571
    if attr_name in ADDITIONAL_SUBNET_ATTRS:
1✔
572
        # private dns name options on launch contains dict with keys EnableResourceNameDnsARecord and EnableResourceNameDnsAAAARecord, HostnameType
573
        if attr_name == "private_dns_name_options_on_launch":
1✔
574
            if hasattr(subnet, attr_name):
1✔
575
                getattr(subnet, attr_name).update(attr_value)
×
576
                return
×
577
            else:
578
                setattr(subnet, attr_name, attr_value)
1✔
579
                return
1✔
580
        setattr(subnet, attr_name, attr_value)
×
581
        return
×
582
    return fn(self, subnet_id, attr_name, attr_value)
1✔
583

584

585
def get_moto_backend(context: RequestContext) -> EC2Backend:
1✔
586
    """Get the moto EC2 backend for the given request context"""
587
    return ec2_backends[context.account_id][context.region]
1✔
588

589

590
@patch(Subnet.get_filter_value)
1✔
591
def get_filter_value(fn, self, filter_name):
1✔
592
    if filter_name in (
1✔
593
        "ipv6CidrBlockAssociationSet.associationId",
594
        "ipv6-cidr-block-association.association-id",
595
    ):
596
        return self.ipv6_cidr_block_associations
×
597
    return fn(self, filter_name)
1✔
598

599

600
@patch(TransitGatewayAttachmentBackend.delete_transit_gateway_vpc_attachment)
1✔
601
def delete_transit_gateway_vpc_attachment(fn, self, transit_gateway_attachment_id, **kwargs):
1✔
602
    transit_gateway_attachment = self.transit_gateway_attachments.get(transit_gateway_attachment_id)
1✔
603
    transit_gateway_attachment.state = "deleted"
1✔
604
    return transit_gateway_attachment
1✔
605

606

607
@patch(FlowLogsBackend._validate_request)
1✔
608
def _validate_request(
1✔
609
    fn,
610
    self,
611
    log_group_name: str,
612
    log_destination: str,
613
    log_destination_type: str,
614
    max_aggregation_interval: str,
615
    deliver_logs_permission_arn: str,
616
) -> None:
617
    if not log_destination_type and log_destination:
1✔
618
        # this is to fix the S3 destination issue, the validation will occur in the provider
619
        return
×
620

621
    fn(
1✔
622
        self,
623
        log_group_name,
624
        log_destination,
625
        log_destination_type,
626
        max_aggregation_interval,
627
        deliver_logs_permission_arn,
628
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc