• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

intuit / Trapheus / 7ecf2541-e446-4544-a68f-78cc8fdb3252

04 Oct 2023 08:10AM UTC coverage: 95.298%. Remained the same
7ecf2541-e446-4544-a68f-78cc8fdb3252

push

circleci

github-actions[bot]
docs: Added README."fr".md translation via https://github.com/dephraiim/translate-readme

527 of 553 relevant lines covered (95.3%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/src/restore/cluster_restore_function.py
1
import os
1✔
2
import boto3
1✔
3
import constants
1✔
4
import custom_exceptions
1✔
5
import utility as util
1✔
6

7
def lambda_restore_dbcluster(event, context):
1✔
8
    """Handles restore of a db cluster from its snapshot"""
9
    region = os.environ['Region']
1✔
10
    result = {}
1✔
11
    rds = boto3.client('rds', region)
1✔
12
    old_cluster_id = event['identifier']
1✔
13
    response = util.get_modified_identifier(event['identifier'])
1✔
14
    cluster_id = response["instance_id"]
1✔
15
    cluster_snapshot_id = response["snapshot_id"]
1✔
16
    try:
1✔
17
        describe_db_response = rds.describe_db_clusters(
1✔
18
            DBClusterIdentifier = old_cluster_id
19
        )
20
        vpc_security_groups = describe_db_response['DBClusters'][0]['VpcSecurityGroups']
1✔
21
        engine = describe_db_response['DBClusters'][0]['Engine']
1✔
22
        engine_version = describe_db_response['DBClusters'][0]['EngineVersion']
1✔
23
        vpc_security_groups_ids = []
1✔
24
        for vpc_security_group in vpc_security_groups:
1✔
25
            vpc_security_groups_ids.append(vpc_security_group['VpcSecurityGroupId'])
1✔
26
        rds.restore_db_cluster_from_snapshot(
1✔
27
            DBClusterIdentifier = cluster_id,
28
            SnapshotIdentifier = cluster_snapshot_id,
29
            Engine = engine,
30
            EngineVersion = engine_version,
31
            DBSubnetGroupName = describe_db_response['DBClusters'][0]['DBSubnetGroup'],
32
            Port = describe_db_response['DBClusters'][0]['Port'],
33
            DatabaseName = describe_db_response['DBClusters'][0]['DatabaseName'],
34
            VpcSecurityGroupIds = vpc_security_groups_ids)
35
        for db_cluster_member in describe_db_response['DBClusters'][0]['DBClusterMembers']:
1✔
36
            desc_db_response = rds.describe_db_instances(
1✔
37
                DBInstanceIdentifier = db_cluster_member['DBInstanceIdentifier']
38
            )
39
            dbinstance_identifier = util.get_modified_identifier(db_cluster_member['DBInstanceIdentifier'])["instance_id"]
1✔
40
            rds.create_db_instance(
1✔
41
                DBInstanceIdentifier = dbinstance_identifier,
42
                DBInstanceClass = desc_db_response['DBInstances'][0]['DBInstanceClass'],
43
                Engine = engine,
44
                DBClusterIdentifier = cluster_id)
45
        
46
        result['taskname'] = constants.CLUSTER_RESTORE
1✔
47
        result['identifier'] = cluster_id
1✔
48
        return result
1✔
49
    except Exception as error:
1✔
50
        error_message = util.get_error_message(cluster_id, error)
1✔
51
        if constants.RATE_EXCEEDED in str(error):
1✔
52
            raise custom_exceptions.RateExceededException(error_message)
1✔
53
        else:
54
            raise custom_exceptions.ClusterRestoreException(error_message)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc