• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

intuit / Trapheus / 7ecf2541-e446-4544-a68f-78cc8fdb3252

04 Oct 2023 08:10AM CUT coverage: 95.298%. Remained the same
7ecf2541-e446-4544-a68f-78cc8fdb3252

push

circleci

github-actions[bot]
docs: Added README."fr".md translation via https://github.com/dephraiim/translate-readme

527 of 553 relevant lines covered (95.3%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/src/export/export_cluster_snapshot_s3_function.py
1
import os
1✔
2
import time
1✔
3

4
import boto3
1✔
5

6
import constants
1✔
7
import utility as util
1✔
8

9

10
def lambda_export_rds_cluster_snapshot_to_s3(event, context):
1✔
11
    """start export task of RDS cluster snapshot to S3 bucket"""
12
    region = os.environ['Region']
1✔
13
    rds = boto3.client('rds', region)
1✔
14
    result = {}
1✔
15
    instance_id = event['identifier']
1✔
16
    epoch = int(time.time())
1✔
17
    export_id = instance_id + "-" + str(epoch)
1✔
18
    snapshot_id = instance_id + constants.SNAPSHOT_POSTFIX
1✔
19
    snapshot_arn = get_cluster_snapshot_arn(snapshot_id)
1✔
20
    account_id = util.get_aws_account_id()
1✔
21
    bucket_name = constants.RDS_SNAPSHOTS_BUCKET_NAME_PREFIX + account_id
1✔
22
    try:
1✔
23
        response = rds.start_export_task(
1✔
24
            ExportTaskIdentifier=export_id,
25
            SourceArn=snapshot_arn,
26
            S3BucketName=bucket_name,
27
            IamRoleArn=os.environ['SNAPSHOT_EXPORT_TASK_ROLE'],
28
            KmsKeyId=os.environ['SNAPSHOT_EXPORT_TASK_KEY'],
29
        )
30
        result['taskname'] = constants.EXPORT_SNAPSHOT
1✔
31
        result['identifier'] = instance_id
1✔
32
        result['status'] = response['Status']
1✔
33
        return result
1✔
34
    except Exception as error:
×
35
        raise Exception(error)
×
36

37

38
def get_cluster_snapshot_arn(snapshot_name):
1✔
39
    """returns cluster snapshot arn if in available state"""
40
    region = os.environ['Region']
1✔
41
    rds = boto3.client('rds', region)
1✔
42
    snapshots_response = rds.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier=snapshot_name)
1✔
43
    assert snapshots_response['ResponseMetadata'][
1✔
44
               'HTTPStatusCode'] == 200, f"Error fetching cluster snapshots: {snapshots_response}"
45
    snapshots = snapshots_response['DBClusterSnapshots']
1✔
46
    assert len(snapshots) == 1, f"No snapshot matches name {snapshot_name}"
1✔
47
    snap = snapshots[0]
1✔
48
    snap_status = snap.get('Status')
1✔
49
    if snap_status == 'available':
1✔
50
        return snap['DBClusterSnapshotArn']
1✔
51
    else:
52
        raise Exception(f"Snapshot is not available yet, status is {snap_status}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc