• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / hatchery / 8299721104

15 Mar 2024 04:56PM UTC coverage: 19.95% (+0.5%) from 19.474%
8299721104

Pull #96

github

paulineribeyre
master - resolve conflicts
Pull Request #96: MIDRC-543 Nextflow image builder AMI

45 of 73 new or added lines in 3 files covered. (61.64%)

1 existing line in 1 file now uncovered.

1349 of 6762 relevant lines covered (19.95%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.13
/hatchery/nextflow.go
1
package hatchery
2

3
import (
4
        "encoding/base64"
5
        "errors"
6
        "fmt"
7
        "net"
8
        "os"
9
        "strings"
10
        "time"
11

12
        "github.com/apparentlymart/go-cidr/cidr"
13
        "github.com/aws/aws-sdk-go/aws"
14
        "github.com/aws/aws-sdk-go/aws/awserr"
15
        "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
16
        "github.com/aws/aws-sdk-go/aws/session"
17
        "github.com/aws/aws-sdk-go/service/batch"
18
        "github.com/aws/aws-sdk-go/service/ec2"
19
        "github.com/aws/aws-sdk-go/service/iam"
20
        "github.com/aws/aws-sdk-go/service/imagebuilder"
21
        "github.com/aws/aws-sdk-go/service/s3"
22
)
23

24
/*
25
General TODOS:
26
- Make the AWS region configurable in the hatchery config (although ideally, the user should be able to choose)
27
- Make the `roleArn` configurable
28
- The contents of `s3://<nextflow bucket>/<username>` are not deleted because researchers may need to keep the intermediary files.
29
  We should set bucket lifecycle rules to delete after X days.
30
- Can we do this long setup as a separate workspace launch step, instead of in the launch() function?
31
*/
32

33
// create the AWS resources required to launch nextflow workflows
NEW
34
func createNextflowResources(userName string, nextflowGlobalConfig NextflowGlobalConfig, nextflowConfig NextflowConfig) (string, string, error) {
×
35
        var err error
×
36

×
37
        // credentials and AWS services init
×
38
        payModel, err := getCurrentPayModel(userName)
×
39
        if err != nil {
×
40
                return "", "", err
×
41
        }
×
42
        sess := session.Must(session.NewSession(&aws.Config{
×
43
                Region: aws.String("us-east-1"),
×
44
        }))
×
45
        awsAccountId, awsConfig, err := getNextflowAwsSettings(sess, payModel, userName, "creating")
×
46
        if err != nil {
×
47
                return "", "", err
×
48
        }
×
49
        Config.Logger.Printf("AWS account ID: '%v'", awsAccountId)
×
50
        batchSvc := batch.New(sess, &awsConfig)
×
51
        iamSvc := iam.New(sess, &awsConfig)
×
52
        s3Svc := s3.New(sess, &awsConfig)
×
53
        ec2Svc := ec2.New(sess, &awsConfig)
×
54

×
55
        userName = escapism(userName)
×
56
        hostname := strings.ReplaceAll(os.Getenv("GEN3_ENDPOINT"), ".", "-")
×
57

×
58
        // The bucket name is not user-specific, but each user only has access to their prefix (`/username/*`) inside
×
59
        // the bucket. Bucket names are globally unique, so we add the AWS account ID so that each AWS account connected
×
60
        // to the environment can have its own Nextflow bucket - eg 1 bucket in main account for blanket billing workspaces,
×
61
        // 1 bucket in userA's account for userA's direct pay workspace, etc.
×
62
        bucketName := fmt.Sprintf("%s-nf-%s", hostname, awsAccountId)
×
63

×
64
        // set the tags we will use on all created resources.
×
65
        // different services accept different formats
×
66
        // TODO The VPC, subnets, route tables and squid instance do not have the
×
67
        // same tag as the other resources, so we can't use the same tag to track
×
68
        // costs. To use the same tag, we might need to update `vpc.go`.
×
69
        tag := fmt.Sprintf("%s-hatchery-nf-%s", hostname, userName)
×
70
        // TODO Jawad mentioned we should add more tags. Ask him which ones are needed
×
71
        tagsMap := map[string]*string{
×
72
                "Name": &tag,
×
73
        }
×
74
        tags := []*iam.Tag{
×
75
                &iam.Tag{
×
76
                        Key:   aws.String("Name"),
×
77
                        Value: &tag,
×
78
                },
×
79
        }
×
80
        pathPrefix := aws.String(fmt.Sprintf("/%s/", tag))
×
81

×
82
        s3BucketWhitelistCondition := "" // if not configured, no buckets are allowed
×
83
        if len(nextflowConfig.S3BucketWhitelist) > 0 {
×
84
                s3BucketWhitelist := ""
×
85
                for _, bucket := range nextflowConfig.S3BucketWhitelist {
×
86
                        if s3BucketWhitelist != "" {
×
87
                                s3BucketWhitelist += ", "
×
88
                        }
×
89
                        s3BucketWhitelist += fmt.Sprintf("\"arn:aws:s3:::%s\", \"arn:aws:s3:::%s/*\"", bucket, bucket)
×
90
                }
91
                s3BucketWhitelistCondition = fmt.Sprintf(`,
×
92
                {
×
93
                        "Sid": "AllowWhitelistedBuckets",
×
94
                        "Effect": "Allow",
×
95
                        "Action": [
×
96
                                "s3:GetObject",
×
97
                                "s3:ListBucket"
×
98
                        ],
×
99
                        "Resource": [
×
100
                                %s
×
101
                        ]
×
102
                }`, s3BucketWhitelist)
×
103
        }
104

105
        // create the VPC if it doesn't exist
106
        vpcid, subnetids, err := setupVpcAndSquid(ec2Svc, userName, hostname)
×
107
        if err != nil {
×
108
                Config.Logger.Printf("Unable to setup VPC: %v", err)
×
109
                return "", "", err
×
110
        }
×
111

112
        // Create nextflow compute environment if it does not exist
NEW
113
        batchComputeEnvArn, err := createBatchComputeEnvironment(nextflowGlobalConfig, nextflowConfig, userName, hostname, tagsMap, batchSvc, ec2Svc, iamSvc, *vpcid, *subnetids, payModel, awsAccountId)
×
114
        if err != nil {
×
115
                Config.Logger.Printf("Error creating compute environment for user %s: %s", userName, err.Error())
×
116
                return "", "", err
×
117
        }
×
118

119
        // Create S3 bucket
120
        err = createS3bucket(s3Svc, bucketName)
×
121
        if err != nil {
×
122
                Config.Logger.Printf("Error creating S3 bucket '%s': %v", bucketName, err)
×
123
                return "", "", err
×
124
        }
×
125

126
        // create AWS batch job queue
127
        batchJobQueueName := fmt.Sprintf("%s-nf-job-queue-%s", hostname, userName)
×
128
        _, err = batchSvc.CreateJobQueue(&batch.CreateJobQueueInput{
×
129
                JobQueueName: &batchJobQueueName,
×
130
                ComputeEnvironmentOrder: []*batch.ComputeEnvironmentOrder{
×
131
                        {
×
132
                                ComputeEnvironment: &batchComputeEnvArn,
×
133
                                Order:              aws.Int64(int64(0)),
×
134
                        },
×
135
                },
×
136
                Priority: aws.Int64(int64(0)),
×
137
                Tags:     tagsMap,
×
138
        })
×
139
        if err != nil {
×
140
                if strings.Contains(err.Error(), "Object already exists") {
×
141
                        Config.Logger.Printf("Debug: Batch job queue '%s' already exists", batchJobQueueName)
×
142
                } else {
×
143
                        Config.Logger.Printf("Error creating Batch job queue '%s': %v", batchJobQueueName, err)
×
144
                        return "", "", err
×
145
                }
×
146
        } else {
×
147
                Config.Logger.Printf("Created Batch job queue '%s'", batchJobQueueName)
×
148
        }
×
149

150
        // create IAM policy for nextflow-created jobs
151
        policyName := fmt.Sprintf("%s-nf-jobs-%s", hostname, userName)
×
152
        nextflowJobsPolicyArn, err := createOrUpdatePolicy(iamSvc, policyName, pathPrefix, tags, aws.String(fmt.Sprintf(`{
×
153
                "Version": "2012-10-17",
×
154
                "Statement": [
×
155
                        {
×
156
                                "Sid": "AllowListingBucketFolder",
×
157
                                "Effect": "Allow",
×
158
                                "Action": [
×
159
                                        "s3:ListBucket"
×
160
                                ],
×
161
                                "Resource": [
×
162
                                        "arn:aws:s3:::%s"
×
163
                                ],
×
164
                                "Condition": {
×
165
                                        "StringLike": {
×
166
                                                "s3:prefix": [
×
167
                                                        "%s/*"
×
168
                                                ]
×
169
                                        }
×
170
                                }
×
171
                        },
×
172
                        {
×
173
                                "Sid": "AllowManagingBucketFolder",
×
174
                                "Effect": "Allow",
×
175
                                "Action": [
×
176
                                        "s3:GetObject",
×
177
                                        "s3:PutObject",
×
178
                                        "s3:DeleteObject"
×
179
                                ],
×
180
                                "Resource": [
×
181
                                        "arn:aws:s3:::%s/%s/*"
×
182
                                ]
×
183
                        }
×
184
                        %s
×
185
                ]
×
186
        }`, bucketName, userName, bucketName, userName, s3BucketWhitelistCondition)))
×
187
        if err != nil {
×
188
                return "", "", err
×
189
        }
×
190

191
        // create role for nextflow-created jobs
192
        roleName := truncateString(policyName, 64)
×
193
        roleResult, err := iamSvc.CreateRole(&iam.CreateRoleInput{
×
194
                RoleName: &roleName,
×
195
                AssumeRolePolicyDocument: aws.String(`{
×
196
                        "Version": "2012-10-17",
×
197
                        "Statement": [
×
198
                                {
×
199
                                        "Effect": "Allow",
×
200
                                        "Principal": {
×
201
                                                "Service": "ecs-tasks.amazonaws.com"
×
202
                                        },
×
203
                                        "Action": "sts:AssumeRole"
×
204
                                }
×
205
                        ]
×
206
                }`),
×
207
                Path: pathPrefix, // so we can use the path later to get the role ARN
×
208
                Tags: tags,
×
209
        })
×
210
        nextflowJobsRoleArn := ""
×
211
        if err != nil {
×
212
                if aerr, ok := err.(awserr.Error); ok {
×
213
                        if aerr.Code() == iam.ErrCodeEntityAlreadyExistsException {
×
214
                                Config.Logger.Printf("Debug: role '%s' already exists", roleName)
×
215
                                listRolesResult, err := iamSvc.ListRoles(&iam.ListRolesInput{
×
216
                                        PathPrefix: pathPrefix,
×
217
                                })
×
218
                                if err != nil || len(listRolesResult.Roles) == 0 {
×
219
                                        Config.Logger.Printf("Error getting existing role '%s': %v", roleName, err)
×
220
                                        return "", "", err
×
221
                                }
×
222
                                nextflowJobsRoleArn = *listRolesResult.Roles[0].Arn
×
223
                        } else {
×
224
                                Config.Logger.Printf("Error creating role '%s': %v", roleName, aerr)
×
225
                                return "", "", err
×
226
                        }
×
227
                } else {
×
228
                        Config.Logger.Printf("Error creating role '%s': %v", roleName, err)
×
229
                        return "", "", err
×
230
                }
×
231
        } else {
×
232
                Config.Logger.Printf("Created role '%s'", roleName)
×
233
                nextflowJobsRoleArn = *roleResult.Role.Arn
×
234
        }
×
235

236
        // attach policy to role for nextflow-created jobs
237
        _, err = iamSvc.AttachRolePolicy(&iam.AttachRolePolicyInput{
×
238
                PolicyArn: &nextflowJobsPolicyArn,
×
239
                RoleName:  &roleName,
×
240
        })
×
241
        if err != nil {
×
242
                Config.Logger.Printf("Error attaching policy '%s' to role '%s': %v", policyName, roleName, err)
×
243
                return "", "", err
×
244
        } else {
×
245
                Config.Logger.Printf("Attached policy '%s' to role '%s'", policyName, roleName)
×
246
        }
×
247

248
        // create IAM policy for nextflow client
249
        // Note: `batch:DescribeComputeEnvironments` is listed as required
250
        // in the Nextflow docs, but it seems to work fine without it.
251
        policyName = fmt.Sprintf("%s-nf-%s", hostname, userName)
×
252
        jobImageWhitelistCondition := "" // if not configured, all images are allowed
×
253
        if len(nextflowConfig.JobImageWhitelist) > 0 {
×
254
                jobImageWhitelist := fmt.Sprintf(`"%v"`, strings.Join(replaceAllUsernamePlaceholders(nextflowConfig.JobImageWhitelist, userName), "\", \""))
×
255
                jobImageWhitelistCondition = fmt.Sprintf(`,
×
256
                "Condition": {
×
257
                        "StringLike": {
×
258
                                "batch:Image": [
×
259
                                        %s
×
260
                                ]
×
261
                        }
×
262
                }`, jobImageWhitelist)
×
263
        }
×
264
        nextflowPolicyArn, err := createOrUpdatePolicy(iamSvc, policyName, pathPrefix, tags, aws.String(fmt.Sprintf(`{
×
265
                "Version": "2012-10-17",
×
266
                "Statement": [
×
267
                        {
×
268
                                "Sid": "AllowPassingNextflowJobsRole",
×
269
                                "Effect": "Allow",
×
270
                                "Action": [
×
271
                                        "iam:PassRole"
×
272
                                ],
×
273
                                "Resource": [
×
274
                                        "%s"
×
275
                                ]
×
276
                        },
×
277
                        {
×
278
                                "Sid": "AllowBatchActionsWithGranularAuthz",
×
279
                                "Effect": "Allow",
×
280
                                "Action": [
×
281
                                        "batch:DescribeJobQueues",
×
282
                                        "batch:ListJobs",
×
283
                                        "batch:SubmitJob",
×
284
                                        "batch:CancelJob",
×
285
                                        "batch:TerminateJob"
×
286
                                ],
×
287
                                "Resource": [
×
288
                                        "arn:aws:batch:*:*:job-definition/*",
×
289
                                        "arn:aws:batch:*:*:job-queue/%s"
×
290
                                ]
×
291
                        },
×
292
                        {
×
293
                                "Sid": "AllowBatchActionsWithoutGranularAuthz",
×
294
                                "Effect": "Allow",
×
295
                                "Action": [
×
296
                                        "batch:DescribeJobs",
×
297
                                        "batch:DescribeJobDefinitions"
×
298
                                ],
×
299
                                "Resource": [
×
300
                                        "*"
×
301
                                ]
×
302
                        },
×
303
                        {
×
304
                                "Sid": "AllowWhitelistedImages",
×
305
                                "Effect": "Allow",
×
306
                                "Action": [
×
307
                                        "batch:RegisterJobDefinition"
×
308
                                ],
×
309
                                "Resource": [
×
310
                                        "arn:aws:batch:*:*:job-definition/*"
×
311
                                ]
×
312
                                %s
×
313
                        },
×
314
                        {
×
315
                                "Sid": "AllowListingBucketFolder",
×
316
                                "Effect": "Allow",
×
317
                                "Action": [
×
318
                                        "s3:ListBucket"
×
319
                                ],
×
320
                                "Resource": [
×
321
                                        "arn:aws:s3:::%s"
×
322
                                ],
×
323
                                "Condition": {
×
324
                                        "StringLike": {
×
325
                                                "s3:prefix": [
×
326
                                                        "%s/*"
×
327
                                                ]
×
328
                                        }
×
329
                                }
×
330
                        },
×
331
                        {
×
332
                                "Sid": "AllowManagingBucketFolder",
×
333
                                "Effect": "Allow",
×
334
                                "Action": [
×
335
                                        "s3:GetObject",
×
336
                                        "s3:PutObject",
×
337
                                        "s3:DeleteObject"
×
338
                                ],
×
339
                                "Resource": [
×
340
                                        "arn:aws:s3:::%s/%s/*"
×
341
                                ]
×
342
                        }
×
343
                        %s
×
344
                ]
×
345
        }`, nextflowJobsRoleArn, batchJobQueueName, jobImageWhitelistCondition, bucketName, userName, bucketName, userName, s3BucketWhitelistCondition)))
×
346
        if err != nil {
×
347
                return "", "", err
×
348
        }
×
349

350
        // create user for nextflow client
351
        nextflowUserName := fmt.Sprintf("%s-nf-%s", hostname, userName)
×
352
        _, err = iamSvc.CreateUser(&iam.CreateUserInput{
×
353
                UserName: &nextflowUserName,
×
354
                Tags:     tags,
×
355
        })
×
356
        if err != nil {
×
357
                if strings.Contains(err.Error(), "EntityAlreadyExists") {
×
358
                        Config.Logger.Printf("Debug: user '%s' already exists", nextflowUserName)
×
359

×
360
                        // delete any existing access keys to avoid `LimitExceeded: Cannot exceed
×
361
                        // quota for AccessKeysPerUser: 2` error
×
362
                        err = deleteUserAccessKeys(nextflowUserName, iamSvc)
×
363
                        if err != nil {
×
364
                                Config.Logger.Printf("Unable to delete access keys for user '%s': %v", nextflowUserName, err)
×
365
                                return "", "", err
×
366
                        }
×
367

368
                } else {
×
369
                        Config.Logger.Printf("Error creating user '%s': %v", nextflowUserName, err)
×
370
                        return "", "", err
×
371
                }
×
372
        } else {
×
373
                Config.Logger.Printf("Created user '%s'", nextflowUserName)
×
374
        }
×
375

376
        // attach policy to user for nextflow client
377
        _, err = iamSvc.AttachUserPolicy(&iam.AttachUserPolicyInput{
×
378
                UserName:  &nextflowUserName,
×
379
                PolicyArn: &nextflowPolicyArn,
×
380
        })
×
381
        if err != nil {
×
382
                Config.Logger.Printf("Error attaching policy '%s' to user '%s': %v", policyName, nextflowUserName, err)
×
383
                return "", "", err
×
384
        } else {
×
385
                Config.Logger.Printf("Attached policy '%s' to user '%s'", policyName, nextflowUserName)
×
386
        }
×
387

388
        // create access key for the nextflow user
389
        accessKeyResult, err := iamSvc.CreateAccessKey(&iam.CreateAccessKeyInput{
×
390
                UserName: &nextflowUserName,
×
391
        })
×
392
        if err != nil {
×
393
                Config.Logger.Printf("Error creating access key for user '%s': %v", nextflowUserName, err)
×
394
                return "", "", err
×
395
        }
×
396
        keyId := *accessKeyResult.AccessKey.AccessKeyId
×
397
        keySecret := *accessKeyResult.AccessKey.SecretAccessKey
×
398
        Config.Logger.Printf("Created access key '%v' for user '%s'", keyId, nextflowUserName)
×
399

×
400
        return keyId, keySecret, nil
×
401
}
402

403
func getNextflowAwsSettings(sess *session.Session, payModel *PayModel, userName string, action string) (string, aws.Config, error) {
7✔
404
        // credentials and AWS services init
7✔
405
        var awsConfig aws.Config
7✔
406
        var awsAccountId string
7✔
407
        if payModel != nil && payModel.Ecs {
10✔
408
                Config.Logger.Printf("Info: pay model enabled for user '%s': %s Nextflow resources in user's AWS account", userName, action)
3✔
409
                roleArn := fmt.Sprintf("arn:aws:iam::%s:role/csoc_adminvm", payModel.AWSAccountId)
3✔
410
                awsConfig = aws.Config{
3✔
411
                        Credentials: stscreds.NewCredentials(sess, roleArn),
3✔
412
                }
3✔
413
                awsAccountId = payModel.AWSAccountId
3✔
414
        } else {
7✔
415
                Config.Logger.Printf("Info: pay model disabled for user '%s': %s Nextflow resources in main AWS account", userName, action)
4✔
416
                awsConfig = aws.Config{}
4✔
417
                Config.Logger.Printf("Debug: Getting AWS account ID...")
4✔
418
                awsAccountId, err := getAwsAccountId(sess, &awsConfig)
4✔
419
                if err != nil {
8✔
420
                        Config.Logger.Printf("Error getting AWS account ID: %v", err)
4✔
421
                        return awsAccountId, awsConfig, err
4✔
422
                }
4✔
423
        }
424
        return awsAccountId, awsConfig, nil
3✔
425
}
426

427
// Create VPC for aws batch compute environment
428
func setupVpcAndSquid(ec2Svc *ec2.EC2, userName string, hostname string) (*string, *[]string, error) {
×
429
        // TODO: make base CIDR configurable?
×
430
        cidrstring := "192.168.0.0/16"
×
431
        _, IPNet, _ := net.ParseCIDR(cidrstring)
×
432
        numberOfSubnets := 3
×
433
        // subnet cidr ranges in array
×
434
        subnets := []string{}
×
435
        subnetIds := []string{}
×
436

×
437
        // loop over the number of subnets and create them
×
438
        for i := 0; i < numberOfSubnets; i++ {
×
439
                subnet, err := cidr.Subnet(IPNet, 2, i)
×
440
                if err != nil {
×
441
                        return nil, nil, err
×
442
                }
×
443
                subnetString := subnet.String()
×
444
                subnets = append(subnets, subnetString)
×
445
        }
446

447
        // create the VPC
448
        // The VPC is per-user because the Squid architecture would not work with multiple users sharing a VPC, as
449
        // it follows the lifecycle of the workspace. Idle VPCs don’t cost anything so we can create one per user.
450
        vpcName := fmt.Sprintf("%s-nf-vpc-%s", hostname, userName)
×
451
        vpc, err := ec2Svc.DescribeVpcs(&ec2.DescribeVpcsInput{
×
452
                Filters: []*ec2.Filter{
×
453
                        {
×
454
                                Name:   aws.String("cidr"),
×
455
                                Values: []*string{aws.String(cidrstring)},
×
456
                        },
×
457
                        {
×
458
                                Name:   aws.String("tag:Name"),
×
459
                                Values: []*string{aws.String(vpcName)},
×
460
                        },
×
461
                        {
×
462
                                Name:   aws.String("tag:Environment"),
×
463
                                Values: []*string{aws.String(os.Getenv("GEN3_ENDPOINT"))},
×
464
                        },
×
465
                },
×
466
        })
×
467
        if err != nil {
×
468
                return nil, nil, err
×
469
        }
×
470
        vpcid := ""
×
471
        // TODO: Check that the VPC is configured correctly, and not just that it exists
×
472
        if len(vpc.Vpcs) == 0 {
×
473
                Config.Logger.Print("Debug: VPC does not exist, creating it now")
×
474
                vpc, err := createVPC(cidrstring, vpcName, ec2Svc)
×
475
                if err != nil {
×
476
                        return nil, nil, err
×
477
                }
×
478
                Config.Logger.Printf("Debug: Created VPC '%s'", vpcName)
×
479

×
480
                vpcid = *vpc.Vpc.VpcId
×
481
        } else {
×
482
                vpcid = *vpc.Vpcs[0].VpcId
×
483
        }
×
484

485
        // create internet gateway
486
        igw, err := createInternetGW(vpcName, vpcid, ec2Svc)
×
487
        if err != nil {
×
488
                return nil, nil, err
×
489
        }
×
490

491
        // create subnets
492
        for i, subnet := range subnets {
×
493
                subnetName := fmt.Sprintf("%s-nf-subnet-%s-%d", hostname, userName, i)
×
494
                subnetId, err := setupSubnet(subnetName, subnet, vpcid, ec2Svc)
×
495
                if err != nil {
×
496
                        return nil, nil, err
×
497
                }
×
498
                subnetIds = append(subnetIds, *subnetId)
×
499
        }
500

501
        // setup route table for regular subnets
502
        routeTableId, err := setupRouteTable(hostname, userName, ec2Svc, vpcid, *igw, fmt.Sprintf("%s-nf-rt-%s", hostname, userName))
×
503
        if err != nil {
×
504
                return nil, nil, err
×
505
        }
×
506

507
        // setup route table for Squid subnet
508
        fwRouteTableId, err := setupRouteTable(hostname, userName, ec2Svc, vpcid, *igw, fmt.Sprintf("%s-nf-fw-rt-%s", hostname, userName))
×
509
        if err != nil {
×
510
                return nil, nil, err
×
511
        }
×
512

513
        // associate subnets with route table
514
        err = associateRouteTablesToSubnets(ec2Svc, subnetIds, *routeTableId)
×
515
        if err != nil {
×
516
                return nil, nil, err
×
517
        }
×
518

519
        // setup Squid
520
        fwSubnetId, err := setupSquid(hostname, userName, cidrstring, ec2Svc, vpcid, igw, fwRouteTableId, routeTableId)
×
521
        if err != nil {
×
522
                return nil, nil, err
×
523
        }
×
524
        Config.Logger.Printf("Debug: Created Squid '%s'", *fwSubnetId)
×
525

×
526
        Config.Logger.Print("Debug: Nextflow VPC setup complete")
×
527
        return &vpcid, &subnetIds, nil
×
528
}
529

530
// Function to make sure launch template is created, and configured correctly
531
// We need a launch template since we need a user data script to authenticate with private ECR repositories
532
func ensureLaunchTemplate(ec2Svc *ec2.EC2, userName string, hostname string, jobImageWhitelist []string) (*string, error) {
×
533

×
534
        // user data script to authenticate with private ECR repositories
×
535
        userData := generateEcrLoginUserData(jobImageWhitelist, userName)
×
536

×
537
        launchTemplateName := fmt.Sprintf("%s-nf-%s", hostname, userName)
×
538

×
539
        Config.Logger.Printf("Debug: Launch template name: %s", launchTemplateName)
×
540

×
541
        // create launch template
×
542
        launchTemplate, err := ec2Svc.DescribeLaunchTemplates(&ec2.DescribeLaunchTemplatesInput{
×
543
                LaunchTemplateNames: []*string{
×
544
                        aws.String(launchTemplateName),
×
545
                },
×
546
        })
×
547
        if err != nil {
×
548
                // If no launch template exists, create it
×
549
                if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "InvalidLaunchTemplateName.NotFoundException" {
×
550
                        Config.Logger.Printf("Debug: Launch template '%s' does not exist, creating it", launchTemplateName)
×
551
                        launchTemplate, err := ec2Svc.CreateLaunchTemplate(&ec2.CreateLaunchTemplateInput{
×
552
                                LaunchTemplateName: aws.String(launchTemplateName),
×
553
                                LaunchTemplateData: &ec2.RequestLaunchTemplateData{
×
554
                                        UserData: aws.String(userData),
×
555
                                },
×
556
                        })
×
557
                        if err != nil {
×
558
                                Config.Logger.Printf("Error creating launch template '%s': %v", launchTemplateName, err)
×
559
                                return nil, err
×
560
                        }
×
561
                        Config.Logger.Printf("Debug: Created launch template '%s'", launchTemplateName)
×
562
                        return launchTemplate.LaunchTemplate.LaunchTemplateName, nil
×
563
                } else {
×
564
                        Config.Logger.Printf("Error describing launch template '%s': %v", launchTemplateName, err)
×
565
                }
×
566
                return nil, err
×
567
        }
568

569
        if len(launchTemplate.LaunchTemplates) == 1 {
×
570
                // TODO: Make sure user data in the existing launch template matches the user data we want
×
571
                Config.Logger.Printf("Debug: Launch template '%s' already exists", launchTemplateName)
×
572
                return launchTemplate.LaunchTemplates[0].LaunchTemplateName, nil
×
573
        }
×
574
        return nil, fmt.Errorf("more than one launch template with the same name exist: %v", launchTemplate.LaunchTemplates)
×
575
}
576

577
// Create AWS Batch compute environment
NEW
578
func createBatchComputeEnvironment(nextflowGlobalConfig NextflowGlobalConfig, nextflowConfig NextflowConfig, userName string, hostname string, tagsMap map[string]*string, batchSvc *batch.Batch, ec2Svc *ec2.EC2, iamSvc *iam.IAM, vpcid string, subnetids []string, payModel *PayModel, awsAccountId string) (string, error) {
×
579
        instanceProfileArn, err := createEcsInstanceProfile(iamSvc, fmt.Sprintf("%s-nf-ecsInstanceRole", hostname))
×
580
        if err != nil {
×
581
                Config.Logger.Printf("Unable to create ECS instance profile: %s", err.Error())
×
582
                return "", err
×
583
        }
×
584

585
        // the launch template for the compute envrionment must be user-specific as well
586
        launchTemplateName, err := ensureLaunchTemplate(ec2Svc, userName, hostname, nextflowConfig.JobImageWhitelist)
×
587
        if err != nil {
×
588
                return "", err
×
589
        }
×
590

591
        // the compute environment must be user-specific as well, since it's in the user-specific VPC
592
        batchComputeEnvName := fmt.Sprintf("%s-nf-compute-env-%s", hostname, userName)
×
593

×
594
        // Check if batch compute env exists, if it does return it
×
595
        batchComputeEnv, err := batchSvc.DescribeComputeEnvironments(&batch.DescribeComputeEnvironmentsInput{
×
596
                ComputeEnvironments: []*string{
×
597
                        aws.String(batchComputeEnvName),
×
598
                },
×
599
        })
×
600
        if err != nil {
×
601
                return "", err
×
602
        }
×
603

604
        // Configure the specified AMI. At the time of writing, CPU workflows launch on ECS_AL2 (default for all
605
        // non-GPU instances) and GPU workflows on ECS_AL2_NVIDIA (default for all GPU instances). Setting the AMI
606
        // for both types is easier than switching the image type based on which AMI (CPU or GPU) is configured.
NEW
607
        ami, err := getNextflowInstanceAmi(nextflowGlobalConfig.ImageBuilderReaderRoleArn, nextflowConfig, nil)
×
NEW
608
        if err != nil {
×
NEW
609
                return "", err
×
NEW
610
        }
×
611
        ec2Configuration := []*batch.Ec2Configuration{
×
612
                {
×
NEW
613
                        ImageIdOverride: aws.String(ami),
×
614
                        ImageType:       aws.String("ECS_AL2"),
×
615
                },
×
616
                {
×
NEW
617
                        ImageIdOverride: aws.String(ami),
×
618
                        ImageType:       aws.String("ECS_AL2_NVIDIA"),
×
619
                },
×
620
        }
×
621

×
622
        var batchComputeEnvArn string
×
623
        if len(batchComputeEnv.ComputeEnvironments) > 0 {
×
624
                Config.Logger.Printf("Debug: Batch compute environment '%s' already exists, updating it", batchComputeEnvName)
×
625
                batchComputeEnvArn = *batchComputeEnv.ComputeEnvironments[0].ComputeEnvironmentArn
×
626

×
627
                // wait for the compute env to be ready to be updated
×
628
                err = waitForBatchComputeEnvironment(batchComputeEnvName, batchSvc, false)
×
629
                if err != nil {
×
630
                        return "", err
×
631
                }
×
632

633
                // update any settings that may have changed in the config
634
                // TODO also make sure it is pointing at the correct subnets - if the VPC is deleted,
635
                // we should recreate the compute environment as well because it will be pointing at old vpc subnets
636
                _, err = batchSvc.UpdateComputeEnvironment(&batch.UpdateComputeEnvironmentInput{
×
637
                        ComputeEnvironment: &batchComputeEnvArn,
×
638
                        State:              aws.String("ENABLED"), // since the env already exists, make sure it's enabled
×
639
                        ComputeResources: &batch.ComputeResourceUpdate{
×
640
                                Ec2Configuration: ec2Configuration,
×
641
                                LaunchTemplate: &batch.LaunchTemplateSpecification{
×
642
                                        LaunchTemplateName: launchTemplateName,
×
643
                                        Version:            aws.String("$Latest"),
×
644
                                },
×
645
                                InstanceRole:       instanceProfileArn,
×
646
                                AllocationStrategy: aws.String("BEST_FIT_PROGRESSIVE"),
×
647
                                MinvCpus:           aws.Int64(int64(nextflowConfig.InstanceMinVCpus)),
×
648
                                MaxvCpus:           aws.Int64(int64(nextflowConfig.InstanceMaxVCpus)),
×
649
                                InstanceTypes:      []*string{aws.String(nextflowConfig.InstanceType)},
×
650
                                Type:               aws.String(nextflowConfig.ComputeEnvironmentType),
×
651
                                Tags:               tagsMap,
×
652
                        },
×
653
                        UpdatePolicy: &batch.UpdatePolicy{
×
654
                                // existing jobs are not terminated and keep running for up to 30 min after this update
×
655
                                JobExecutionTimeoutMinutes: aws.Int64(30),
×
656
                                TerminateJobsOnUpdate:      aws.Bool(false),
×
657
                        },
×
658
                })
×
659
                if err != nil {
×
660
                        Config.Logger.Printf("Unable to update Batch compute environment '%s': %v", batchComputeEnvName, err)
×
661
                        return "", err
×
662
                }
×
663
        } else { // compute environment does not exist, create it
×
664
                subnets := []*string{}
×
665
                for _, subnet := range subnetids {
×
666
                        s := subnet
×
667
                        subnets = append(subnets, &s)
×
668
                }
×
669

670
                // Get the default security group for the VPC
671
                securityGroup, err := ec2Svc.DescribeSecurityGroups(&ec2.DescribeSecurityGroupsInput{
×
672
                        Filters: []*ec2.Filter{
×
673
                                {
×
674
                                        Name:   aws.String("vpc-id"),
×
675
                                        Values: []*string{aws.String(vpcid)},
×
676
                                },
×
677
                                {
×
678
                                        Name:   aws.String("group-name"),
×
679
                                        Values: []*string{aws.String("default")},
×
680
                                },
×
681
                        },
×
682
                })
×
683
                if err != nil {
×
684
                        return "", err
×
685
                }
×
686
                securityGroupId := securityGroup.SecurityGroups[0].GroupId
×
687

×
688
                batchComputeEnvResult, err := batchSvc.CreateComputeEnvironment(&batch.CreateComputeEnvironmentInput{
×
689
                        ComputeEnvironmentName: &batchComputeEnvName,
×
690
                        Type:                   aws.String("MANAGED"),
×
691
                        ComputeResources: &batch.ComputeResource{
×
692
                                Ec2Configuration: ec2Configuration,
×
693
                                LaunchTemplate: &batch.LaunchTemplateSpecification{
×
694
                                        LaunchTemplateName: launchTemplateName,
×
695
                                        Version:            aws.String("$Latest"),
×
696
                                },
×
697
                                InstanceRole:       instanceProfileArn,
×
698
                                AllocationStrategy: aws.String("BEST_FIT_PROGRESSIVE"),
×
699
                                MinvCpus:           aws.Int64(int64(nextflowConfig.InstanceMinVCpus)),
×
700
                                MaxvCpus:           aws.Int64(int64(nextflowConfig.InstanceMaxVCpus)),
×
701
                                InstanceTypes:      []*string{aws.String(nextflowConfig.InstanceType)},
×
702
                                SecurityGroupIds:   []*string{securityGroupId},
×
703
                                Subnets:            subnets,
×
704
                                Type:               aws.String(nextflowConfig.ComputeEnvironmentType),
×
705
                                Tags:               tagsMap,
×
706
                        },
×
707
                        Tags: tagsMap,
×
708
                })
×
709
                if err != nil {
×
710
                        return "", err
×
711
                }
×
712

713
                Config.Logger.Printf("Debug: Created AWS Batch compute environment '%s'", batchComputeEnvName)
×
714
                batchComputeEnvArn = *batchComputeEnvResult.ComputeEnvironmentArn
×
715
        }
716

717
        // the compute environment must be "VALID" before we can create the job queue: wait until ready
718
        err = waitForBatchComputeEnvironment(batchComputeEnvName, batchSvc, true)
×
719
        if err != nil {
×
720
                return "", err
×
721
        }
×
722

723
        return batchComputeEnvArn, nil
×
724
}
725

726
func waitForBatchComputeEnvironment(batchComputeEnvName string, batchSvc *batch.Batch, mustBeValid bool) error {
×
727
        maxIter := 6
×
728
        iterDelaySecs := 5
×
729
        var compEnvStatus string
×
730
        for i := 0; ; i++ {
×
731
                batchComputeEnvs, err := batchSvc.DescribeComputeEnvironments(&batch.DescribeComputeEnvironmentsInput{
×
732
                        ComputeEnvironments: []*string{
×
733
                                aws.String(batchComputeEnvName),
×
734
                        },
×
735
                })
×
736
                if err != nil {
×
737
                        return err
×
738
                }
×
739
                compEnvStatus = *batchComputeEnvs.ComputeEnvironments[0].Status
×
740
                // possible statuses: CREATING | UPDATING | DELETING | DELETED | VALID | INVALID
×
741
                if compEnvStatus == "VALID" {
×
742
                        Config.Logger.Print("Debug: Compute environment is ready")
×
743
                        break
×
744
                }
745
                if !mustBeValid && compEnvStatus == "INVALID" {
×
746
                        Config.Logger.Printf("Debug: Compute environment is %s and can't be used, but can be updated", compEnvStatus)
×
747
                        break
×
748
                }
749
                if i == maxIter {
×
750
                        return fmt.Errorf("compute environment is not ready after %v seconds. Exiting", maxIter*iterDelaySecs)
×
751
                }
×
752
                Config.Logger.Printf("Info: Compute environment is %s, waiting %vs and checking again", compEnvStatus, iterDelaySecs)
×
753
                time.Sleep(time.Duration(iterDelaySecs) * time.Second)
×
754
        }
755
        return nil
×
756
}
757

758
// Create IAM role for AWS Batch compute environment
759
func createEcsInstanceProfile(iamSvc *iam.IAM, name string) (*string, error) {
×
760
        Config.Logger.Printf("Debug: Creating ECS instance profile '%s'", name)
×
761

×
762
        instanceProfile, err := iamSvc.GetInstanceProfile(&iam.GetInstanceProfileInput{
×
763
                InstanceProfileName: aws.String(name),
×
764
        })
×
765
        if err != nil {
×
766
                if aerr, ok := err.(awserr.Error); ok && aerr.Code() == iam.ErrCodeNoSuchEntityException {
×
767
                        Config.Logger.Printf("Debug: Instance profile '%s' does not exist, creating it", name)
×
768
                        _, err = iamSvc.CreateInstanceProfile(&iam.CreateInstanceProfileInput{
×
769
                                InstanceProfileName: aws.String(name),
×
770
                        })
×
771
                        if err != nil {
×
772
                                return nil, err
×
773
                        }
×
774
                }
775
                return nil, err
×
776
        }
777

778
        // Create the IAM role
779
        Config.Logger.Printf("Debug: Creating IAM role '%s'", name)
×
780
        rolePolicy := `{
×
781
                "Version": "2012-10-17",
×
782
                "Statement": [
×
783
                  {
×
784
                        "Effect": "Allow",
×
785
                        "Principal": { "Service": "ec2.amazonaws.com"},
×
786
                        "Action": "sts:AssumeRole"
×
787
                  }
×
788
                ]
×
789
          }`
×
790
        _, err = iamSvc.CreateRole(&iam.CreateRoleInput{
×
791
                AssumeRolePolicyDocument: aws.String(rolePolicy),
×
792
                RoleName:                 aws.String(name),
×
793
        })
×
794
        if err != nil {
×
795
                if aerr, ok := err.(awserr.Error); ok && aerr.Code() == iam.ErrCodeEntityAlreadyExistsException {
×
796
                        Config.Logger.Printf("Debug: Role '%s' already exists, assuming it is already linked to instance profile and continuing", name)
×
797
                        return instanceProfile.InstanceProfile.Arn, nil
×
798
                } else {
×
799
                        Config.Logger.Printf("Unable to create IAM role '%s': %v", name, err)
×
800
                        return nil, err
×
801
                }
×
802
        }
803

804
        // Attach policy to the role
805
        _, err = iamSvc.AttachRolePolicy(&iam.AttachRolePolicyInput{
×
806
                PolicyArn: aws.String("arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"),
×
807
                RoleName:  aws.String(name),
×
808
        })
×
809
        if err != nil {
×
810
                return nil, err
×
811
        }
×
812

813
        _, err = iamSvc.AddRoleToInstanceProfile(&iam.AddRoleToInstanceProfileInput{
×
814
                InstanceProfileName: aws.String(name),
×
815
                RoleName:            aws.String(name),
×
816
        })
×
817
        if err != nil {
×
818
                Config.Logger.Printf("Unable to add role '%s' to instance profile '%s': %s", name, name, err.Error())
×
819
                return nil, err
×
820
        }
×
821

822
        Config.Logger.Printf("Info: Set up ECS instance profile '%s'", name)
×
823
        return instanceProfile.InstanceProfile.Arn, nil
×
824
}
825

826
func createS3bucket(s3Svc *s3.S3, bucketName string) error {
×
827
        // create S3 bucket for nextflow input, output and intermediate files
×
828
        _, err := s3Svc.CreateBucket(&s3.CreateBucketInput{
×
829
                Bucket: &bucketName,
×
830
                // TODO We may need to add the LocationConstraint below if we change the region to not
×
831
                // "us-east-1". It seems this block causes an error when the region is "us-east-1", so
×
832
                // it would need to be added conditionally.
×
833
                // CreateBucketConfiguration: &s3.CreateBucketConfiguration{
×
834
                //         LocationConstraint: aws.String("us-east-1"),
×
835
                // },
×
836
        })
×
837
        if err != nil {
×
838
                // no need to check for a specific "bucket already exists" error since
×
839
                // `s3Svc.CreateBucket` does not error when the bucket exists
×
840
                Config.Logger.Printf("Error creating S3 bucket '%s': %v", bucketName, err)
×
841
                return err
×
842
        }
×
843

844
        Config.Logger.Printf("Created S3 bucket '%s'", bucketName)
×
845
        return nil
×
846
}
847

848
// Function to set up squid and subnets for squid
849
func setupSquid(hostname string, userName string, cidrstring string, ec2svc *ec2.EC2, vpcid string, igw *string, fwRouteTableId *string, routeTableId *string) (*string, error) {
×
850
        _, IPNet, _ := net.ParseCIDR(cidrstring)
×
851
        subnet, err := cidr.Subnet(IPNet, 2, 3)
×
852
        if err != nil {
×
853
                return nil, err
×
854
        }
×
855
        subnetString := subnet.String()
×
856

×
857
        // create subnet
×
858
        subnetName := fmt.Sprintf("%s-nf-subnet-fw-%s", hostname, userName)
×
859
        Config.Logger.Printf("Debug: Creating subnet '%s' with name '%s'", subnet, subnetName)
×
860

×
861
        subnetId, err := setupSubnet(subnetName, subnetString, vpcid, ec2svc)
×
862
        if err != nil {
×
863
                return nil, err
×
864
        }
×
865

866
        // add route to internet gateway
867
        Config.Logger.Printf("Debug: Creating route to internet '%s' in route table '%s'", *igw, *fwRouteTableId)
×
868
        _, err = ec2svc.CreateRoute(&ec2.CreateRouteInput{
×
869
                DestinationCidrBlock: aws.String("0.0.0.0/0"),
×
870
                GatewayId:            igw,
×
871
                RouteTableId:         fwRouteTableId,
×
872
        })
×
873
        if err != nil {
×
874
                return nil, err
×
875
        }
×
876

877
        // associate route table to subnet
878
        _, err = ec2svc.AssociateRouteTable(&ec2.AssociateRouteTableInput{
×
879
                RouteTableId: fwRouteTableId,
×
880
                SubnetId:     subnetId,
×
881
        })
×
882
        if err != nil {
×
883
                return nil, err
×
884
        }
×
885
        Config.Logger.Printf("Debug: Associated route table '%s' to subnet '%s'", *fwRouteTableId, *subnetId)
×
886

×
887
        // launch squid
×
888
        squidInstanceId, err := launchSquidInstance(hostname, userName, ec2svc, subnetId, vpcid, subnetString)
×
889
        if err != nil {
×
890
                return nil, err
×
891
        }
×
892

893
        Config.Logger.Printf("Debug: Will add route to Squid '%s' in route table '%s'", *squidInstanceId, *routeTableId)
×
894
        // add or replace route to squid
×
895
        _, err = ec2svc.CreateRoute(&ec2.CreateRouteInput{
×
896
                DestinationCidrBlock: aws.String("0.0.0.0/0"),
×
897
                InstanceId:           squidInstanceId,
×
898
                RouteTableId:         routeTableId,
×
899
        })
×
900

×
901
        if err != nil {
×
902
                if aerr, ok := err.(awserr.Error); ok {
×
903
                        // Note: code `IncorrectInstanceState` should never happen here, because `launchSquidInstance`
×
904
                        // waits until the instance is ready.
×
905
                        if aerr.Code() == "RouteAlreadyExists" {
×
906
                                // the route already exists, replace it
×
907
                                Config.Logger.Print("Debug: Route already exists, replacing it")
×
908
                                _, err = ec2svc.ReplaceRoute(&ec2.ReplaceRouteInput{
×
909
                                        DestinationCidrBlock: aws.String("0.0.0.0/0"),
×
910
                                        InstanceId:           squidInstanceId,
×
911
                                        RouteTableId:         routeTableId,
×
912
                                })
×
913
                                if err != nil {
×
914
                                        return nil, err
×
915
                                }
×
916
                        } else {
×
917
                                return nil, err
×
918
                        }
×
919
                } else {
×
920
                        return nil, err
×
921
                }
×
922
        }
923

924
        Config.Logger.Printf("Debug: Created route to Squid '%s' in route table '%s'", *squidInstanceId, *routeTableId)
×
925
        return subnetId, nil
×
926
}
927

928
// Generic function to create subnet, and route table
929
func setupSubnet(subnetName string, cidr string, vpcid string, ec2Svc *ec2.EC2) (*string, error) {
×
930
        // Check if subnet exists if not create it
×
931
        exsubnet, err := ec2Svc.DescribeSubnets(&ec2.DescribeSubnetsInput{
×
932
                Filters: []*ec2.Filter{
×
933
                        {
×
934
                                Name:   aws.String("cidr-block"),
×
935
                                Values: []*string{aws.String(cidr)},
×
936
                        },
×
937
                        {
×
938
                                Name:   aws.String("tag:Name"),
×
939
                                Values: []*string{aws.String(subnetName)},
×
940
                        },
×
941
                        {
×
942
                                Name:   aws.String("tag:Environment"),
×
943
                                Values: []*string{aws.String(os.Getenv("GEN3_ENDPOINT"))},
×
944
                        },
×
945
                },
×
946
        })
×
947
        if err != nil {
×
948
                return nil, err
×
949
        }
×
950
        if len(exsubnet.Subnets) > 0 {
×
951
                Config.Logger.Printf("Debug: Subnet '%s' already exists, skipping creation", subnetName)
×
952
                return exsubnet.Subnets[0].SubnetId, nil
×
953
        }
×
954

955
        // create subnet
956
        Config.Logger.Printf("Debug: Creating subnet '%v' with name '%s'", cidr, subnetName)
×
957
        sn, err := ec2Svc.CreateSubnet(&ec2.CreateSubnetInput{
×
958
                CidrBlock: aws.String(cidr),
×
959
                VpcId:     aws.String(vpcid),
×
960
                TagSpecifications: []*ec2.TagSpecification{
×
961
                        {
×
962
                                // Name
×
963
                                ResourceType: aws.String("subnet"),
×
964
                                Tags: []*ec2.Tag{
×
965
                                        {
×
966
                                                Key:   aws.String("Name"),
×
967
                                                Value: aws.String(subnetName),
×
968
                                        },
×
969
                                        {
×
970
                                                Key:   aws.String("Environment"),
×
971
                                                Value: aws.String(os.Getenv("GEN3_ENDPOINT")),
×
972
                                        },
×
973
                                },
×
974
                        },
×
975
                },
×
976
        })
×
977
        if err != nil {
×
978
                return nil, err
×
979
        }
×
980
        return sn.Subnet.SubnetId, nil
×
981
}
982

983
func setupRouteTable(hostname string, userName string, ec2svc *ec2.EC2, vpcid string, igwid string, routeTableName string) (*string, error) {
×
984
        // Check if route table exists
×
985
        exrouteTable, err := ec2svc.DescribeRouteTables(&ec2.DescribeRouteTablesInput{
×
986
                Filters: []*ec2.Filter{
×
987
                        {
×
988
                                Name:   aws.String("tag:Name"),
×
989
                                Values: []*string{aws.String(routeTableName)},
×
990
                        },
×
991
                        {
×
992
                                Name:   aws.String("tag:Environment"),
×
993
                                Values: []*string{aws.String(os.Getenv("GEN3_ENDPOINT"))},
×
994
                        },
×
995
                },
×
996
        })
×
997
        if err != nil {
×
998
                return nil, err
×
999
        }
×
1000

1001
        if len(exrouteTable.RouteTables) > 0 {
×
1002
                Config.Logger.Printf("Debug: Route table '%s' already exists, skipping creation", routeTableName)
×
1003
                return exrouteTable.RouteTables[0].RouteTableId, nil
×
1004
        }
×
1005
        routeTable, err := ec2svc.CreateRouteTable(&ec2.CreateRouteTableInput{
×
1006
                VpcId: &vpcid,
×
1007
                TagSpecifications: []*ec2.TagSpecification{
×
1008
                        {
×
1009
                                // Name
×
1010
                                ResourceType: aws.String("route-table"),
×
1011
                                Tags: []*ec2.Tag{
×
1012
                                        {
×
1013
                                                Key:   aws.String("Name"),
×
1014
                                                Value: aws.String(routeTableName),
×
1015
                                        },
×
1016
                                        {
×
1017
                                                Key:   aws.String("Environment"),
×
1018
                                                Value: aws.String(os.Getenv("GEN3_ENDPOINT")),
×
1019
                                        },
×
1020
                                },
×
1021
                        },
×
1022
                },
×
1023
        })
×
1024
        if err != nil {
×
1025
                return nil, err
×
1026
        }
×
1027
        Config.Logger.Printf("Debug: Created route table '%s' with name '%s'", *routeTable.RouteTable.RouteTableId, routeTableName)
×
1028

×
1029
        if routeTableName == fmt.Sprintf("%s-nf-fw-rt-%s", hostname, userName) {
×
1030
                // create route
×
1031
                Config.Logger.Printf("Debug: Creating route to internet '%s' in route table '%s'", igwid, *routeTable.RouteTable.RouteTableId)
×
1032
                _, err = ec2svc.CreateRoute(&ec2.CreateRouteInput{
×
1033
                        DestinationCidrBlock: aws.String("0.0.0.0/0"),
×
1034
                        GatewayId:            aws.String(igwid),
×
1035
                        RouteTableId:         routeTable.RouteTable.RouteTableId,
×
1036
                })
×
1037
                if err != nil {
×
1038
                        return nil, err
×
1039
                }
×
1040
        }
1041
        return routeTable.RouteTable.RouteTableId, nil
×
1042
}
1043

1044
func associateRouteTablesToSubnets(ec2svc *ec2.EC2, subnets []string, routeTableId string) error {
×
1045
        // associate route tables to subnets
×
1046
        for _, subnet := range subnets {
×
1047
                _, err := ec2svc.AssociateRouteTable(&ec2.AssociateRouteTableInput{
×
1048
                        RouteTableId: aws.String(routeTableId),
×
1049
                        SubnetId:     aws.String(subnet),
×
1050
                })
×
1051
                if err != nil {
×
1052
                        return err
×
1053
                }
×
1054
                Config.Logger.Printf("Debug: Associated route table '%s' to subnet '%s'", routeTableId, subnet)
×
1055
        }
1056
        return nil
×
1057
}
1058

1059
func launchSquidInstance(hostname string, userName string, ec2svc *ec2.EC2, subnetId *string, vpcId string, subnet string) (*string, error) {
×
1060
        instanceName := fmt.Sprintf("%s-nf-squid-%s", hostname, userName)
×
1061

×
1062
        // check if instance already exists, if it does start it
×
1063
        // Check that the state of existing instance is either stopped,stopping or running
×
1064
        descInstanceInput := &ec2.DescribeInstancesInput{
×
1065
                Filters: []*ec2.Filter{
×
1066
                        {
×
1067
                                Name:   aws.String("instance-state-name"),
×
1068
                                Values: []*string{aws.String("stopped"), aws.String("stopping"), aws.String("running"), aws.String("pending")},
×
1069
                        },
×
1070
                        {
×
1071
                                Name:   aws.String("tag:Name"),
×
1072
                                Values: []*string{aws.String(instanceName)},
×
1073
                        },
×
1074
                        {
×
1075
                                Name:   aws.String("tag:Environment"),
×
1076
                                Values: []*string{aws.String(os.Getenv("GEN3_ENDPOINT"))},
×
1077
                        },
×
1078
                },
×
1079
        }
×
1080
        exinstance, err := ec2svc.DescribeInstances(descInstanceInput)
×
1081
        if err != nil {
×
1082
                return nil, err
×
1083
        }
×
1084

1085
        var instanceId string
×
1086
        if len(exinstance.Reservations) > 0 { // instance already exists
×
1087
                instanceId = *exinstance.Reservations[0].Instances[0].InstanceId
×
1088
        } else { // instance does not already exist: create it
×
1089
                // User data script to install and run Squid
×
1090
                userData := `#!/bin/bash
×
1091
USER="ec2-user"
×
1092
USER_HOME="/home/$USER"
×
1093
CLOUD_AUTOMATION="$USER_HOME/cloud-automation"
×
1094
(
×
1095
cd $USER_HOME
×
1096
sudo yum update -y
×
1097
sudo yum install git lsof -y
×
1098
git clone https://github.com/uc-cdis/cloud-automation.git
×
1099
cd $CLOUD_AUTOMATION
×
1100
git pull
×
1101

×
1102
chown -R $USER. $CLOUD_AUTOMATION
×
1103
cd $USER_HOME
×
1104

×
1105
# Configure iptables
×
1106
cp ${CLOUD_AUTOMATION}/flavors/squid_auto/startup_configs/iptables-docker.conf /etc/iptables.conf
×
1107
cp ${CLOUD_AUTOMATION}/flavors/squid_auto/startup_configs/iptables-rules /etc/network/if-up.d/iptables-rules
×
1108

×
1109
chown root: /etc/network/if-up.d/iptables-rules
×
1110
chmod 0755 /etc/network/if-up.d/iptables-rules
×
1111

×
1112
## Enable iptables for NAT. We need this so that the proxy can be used transparently
×
1113
iptables-restore < /etc/iptables.conf
×
1114
iptables-save > /etc/sysconfig/iptables
×
1115

×
1116
SQUID_CONFIG_DIR="/etc/squid"
×
1117
SQUID_LOGS_DIR="/var/log/squid"
×
1118
SQUID_CACHE_DIR="/var/cache/squid"
×
1119

×
1120
###############################################################
×
1121
# Squid configuration files
×
1122
###############################################################
×
1123
mkdir -p ${SQUID_CONFIG_DIR}/ssl
×
1124
cp ${CLOUD_AUTOMATION}/files/squid_whitelist/ftp_whitelist ${SQUID_CONFIG_DIR}/ftp_whitelist
×
1125
cp ${CLOUD_AUTOMATION}/files/squid_whitelist/web_whitelist ${SQUID_CONFIG_DIR}/web_whitelist
×
1126
cp ${CLOUD_AUTOMATION}/files/squid_whitelist/web_wildcard_whitelist ${SQUID_CONFIG_DIR}/web_wildcard_whitelist
×
1127
cp ${CLOUD_AUTOMATION}/flavors/squid_auto/startup_configs/squid.conf ${SQUID_CONFIG_DIR}/squid.conf
×
1128
cp ${CLOUD_AUTOMATION}/flavors/squid_auto/startup_configs/cachemgr.conf ${SQUID_CONFIG_DIR}/cachemgr.conf
×
1129
cp ${CLOUD_AUTOMATION}/flavors/squid_auto/startup_configs/errorpage.css ${SQUID_CONFIG_DIR}/errorpage.css
×
1130
cp ${CLOUD_AUTOMATION}/flavors/squid_auto/startup_configs/mime.conf ${SQUID_CONFIG_DIR}/mime.conf
×
1131
// use a sed command to replace pid_filename xxxx to pid_filename none
×
1132
sed -i 's/pid_filename .*/pid_filename none/g' ${SQUID_CONFIG_DIR}/squid.conf
×
1133

×
1134

×
1135
#####################
×
1136
# for HTTPS
×
1137
#####################
×
1138
openssl genrsa -out ${SQUID_CONFIG_DIR}/ssl/squid.key 2048
×
1139
openssl req -new -key ${SQUID_CONFIG_DIR}/ssl/squid.key -out ${SQUID_CONFIG_DIR}/ssl/squid.csr -subj '/C=XX/ST=XX/L=squid/O=squid/CN=squid'
×
1140
openssl x509 -req -days 3650 -in ${SQUID_CONFIG_DIR}/ssl/squid.csr -signkey ${SQUID_CONFIG_DIR}/ssl/squid.key -out ${SQUID_CONFIG_DIR}/ssl/squid.crt
×
1141
cat ${SQUID_CONFIG_DIR}/ssl/squid.key ${SQUID_CONFIG_DIR}/ssl/squid.crt | sudo tee ${SQUID_CONFIG_DIR}/ssl/squid.pem
×
1142
mkdir -p ${SQUID_LOGS_DIR} ${SQUID_CACHE_DIR}
×
1143
chown -R nobody:nogroup ${SQUID_LOGS_DIR} ${SQUID_CACHE_DIR} ${SQUID_CONFIG_DIR}
×
1144

×
1145
systemctl restart docker
×
1146
$(command -v docker) run --name squid --restart=always --network=host -d \
×
1147
        --volume ${SQUID_LOGS_DIR}:${SQUID_LOGS_DIR} \
×
1148
        --volume ${SQUID_CACHE_DIR}:${SQUID_CACHE_DIR} \
×
1149
        --volume ${SQUID_CONFIG_DIR}:${SQUID_CONFIG_DIR}:ro \
×
1150
        quay.io/cdis/squid:master
×
1151

×
1152

×
1153
) > /var/log/bootstrapping_script.log`
×
1154

×
1155
                // Set private IP to be the 10th ip in subnet range
×
1156
                _, ipnet, _ := net.ParseCIDR(subnet)
×
1157
                privateIP := ipnet.IP
×
1158
                privateIP[3] += 10
×
1159
                Config.Logger.Print("Debug: Private IP: ", privateIP.String())
×
1160

×
1161
                // Get the latest amazonlinux AMI
×
1162
                amiId, err := getLatestAmazonLinuxAmi(ec2svc)
×
1163
                if err != nil {
×
1164
                        return nil, err
×
1165
                }
×
1166

1167
                sgId, err := setupFwSecurityGroup(hostname, userName, ec2svc, &vpcId)
×
1168
                if err != nil {
×
1169
                        return nil, err
×
1170
                }
×
1171

1172
                // instance type
1173
                // TODO: we could make this configurable via hatchery config (would need to change this
1174
                // function to update the instance type if the instance already exists)
1175
                instanceType := "t2.micro"
×
1176

×
1177
                // Launch EC2 instance
×
1178
                squid, err := ec2svc.RunInstances(&ec2.RunInstancesInput{
×
1179
                        ImageId:      amiId,
×
1180
                        InstanceType: aws.String(instanceType),
×
1181
                        MinCount:     aws.Int64(1),
×
1182
                        MaxCount:     aws.Int64(1),
×
1183
                        // Network interfaces
×
1184
                        NetworkInterfaces: []*ec2.InstanceNetworkInterfaceSpecification{
×
1185
                                {
×
1186
                                        AssociatePublicIpAddress: aws.Bool(true),
×
1187
                                        DeviceIndex:              aws.Int64(0),
×
1188
                                        DeleteOnTermination:      aws.Bool(true),
×
1189
                                        SubnetId:                 subnetId,
×
1190
                                        Groups:                   []*string{sgId},
×
1191
                                },
×
1192
                        },
×
1193
                        // base64 encoded user data script
×
1194
                        UserData: aws.String(base64.StdEncoding.EncodeToString([]byte(userData))),
×
1195
                        TagSpecifications: []*ec2.TagSpecification{
×
1196
                                {
×
1197
                                        ResourceType: aws.String("instance"),
×
1198
                                        Tags: []*ec2.Tag{
×
1199
                                                {
×
1200
                                                        Key:   aws.String("Name"),
×
1201
                                                        Value: aws.String(instanceName),
×
1202
                                                },
×
1203
                                                {
×
1204
                                                        Key:   aws.String("Environment"),
×
1205
                                                        Value: aws.String(os.Getenv("GEN3_ENDPOINT")),
×
1206
                                                },
×
1207
                                        },
×
1208
                                },
×
1209
                        },
×
1210
                })
×
1211
                if err != nil {
×
1212
                        Config.Logger.Print("Error launching instance: ", err)
×
1213
                        return nil, err
×
1214
                }
×
1215

1216
                // make sure the eni has source/destionation check disabled
1217
                // https://docs.aws.amazon.com/vpc/latest/userguide/VPC_NAT_Instance.html#EIP_Disable_SrcDestCheck
1218
                _, err = ec2svc.ModifyNetworkInterfaceAttribute(&ec2.ModifyNetworkInterfaceAttributeInput{
×
1219
                        NetworkInterfaceId: squid.Instances[0].NetworkInterfaces[0].NetworkInterfaceId,
×
1220
                        SourceDestCheck: &ec2.AttributeBooleanValue{
×
1221
                                Value: aws.Bool(false),
×
1222
                        },
×
1223
                })
×
1224
                if err != nil {
×
1225
                        return nil, err
×
1226
                }
×
1227

1228
                Config.Logger.Print("Debug: Launched Squid instance")
×
1229
                instanceId = *squid.Instances[0].InstanceId
×
1230
        }
1231

1232
        // Wait until the instance is running
1233
        maxIter := 6
×
1234
        iterDelaySecs := 10
×
1235
        var instanceState string
×
1236
        for i := 0; ; i++ {
×
1237
                exinstance, err = ec2svc.DescribeInstances(descInstanceInput)
×
1238
                if err != nil {
×
1239
                        return nil, err
×
1240
                }
×
1241
                instanceState = *exinstance.Reservations[0].Instances[0].State.Name
×
1242
                if instanceState == "running" {
×
1243
                        Config.Logger.Print("Debug: Squid instance is ready")
×
1244
                        break
×
1245
                }
1246
                if instanceState == "stopped" {
×
1247
                        Config.Logger.Print("Debug: Instance already exists and is stopped, starting it now")
×
1248
                        _, err := ec2svc.StartInstances(&ec2.StartInstancesInput{
×
1249
                                InstanceIds: []*string{
×
1250
                                        &instanceId,
×
1251
                                },
×
1252
                        })
×
1253
                        if err != nil {
×
1254
                                return nil, err
×
1255
                        }
×
1256
                }
1257
                if i == maxIter {
×
1258
                        return nil, fmt.Errorf("squid instance is not ready after %v seconds. Exiting", maxIter*iterDelaySecs)
×
1259
                }
×
1260
                Config.Logger.Printf("Info: Squid instance is %s, waiting %vs and checking again", instanceState, iterDelaySecs)
×
1261
                time.Sleep(time.Duration(iterDelaySecs) * time.Second)
×
1262
        }
1263

1264
        return &instanceId, nil
×
1265
}
1266

1267
func setupFwSecurityGroup(hostname string, userName string, ec2svc *ec2.EC2, vpcId *string) (*string, error) {
×
1268
        // create security group
×
1269
        sgName := fmt.Sprintf("%s-nf-sg-fw-%s", hostname, userName)
×
1270

×
1271
        // Check if security group exists
×
1272
        exsecurityGroup, err := ec2svc.DescribeSecurityGroups(&ec2.DescribeSecurityGroupsInput{
×
1273
                Filters: []*ec2.Filter{
×
1274
                        {
×
1275
                                Name:   aws.String("group-name"),
×
1276
                                Values: []*string{aws.String(sgName)},
×
1277
                        },
×
1278
                        {
×
1279
                                Name:   aws.String("vpc-id"),
×
1280
                                Values: []*string{vpcId},
×
1281
                        },
×
1282
                },
×
1283
        })
×
1284
        if err != nil {
×
1285
                return nil, err
×
1286
        }
×
1287
        if len(exsecurityGroup.SecurityGroups) > 0 {
×
1288
                Config.Logger.Printf("Debug: Security group '%s' already exists, skipping creation", sgName)
×
1289
                return exsecurityGroup.SecurityGroups[0].GroupId, nil
×
1290
        }
×
1291

1292
        sgDesc := "Security group for nextflow Squid"
×
1293
        sgId, err := ec2svc.CreateSecurityGroup(&ec2.CreateSecurityGroupInput{
×
1294
                Description: &sgDesc,
×
1295
                GroupName:   &sgName,
×
1296
                VpcId:       vpcId,
×
1297
        })
×
1298
        if err != nil {
×
1299
                Config.Logger.Printf("Error creating security group '%s': %v", sgName, err)
×
1300
                return nil, err
×
1301
        }
×
1302

1303
        // Add ingress rules
1304
        _, err = ec2svc.AuthorizeSecurityGroupIngress(&ec2.AuthorizeSecurityGroupIngressInput{
×
1305
                GroupId: sgId.GroupId,
×
1306
                IpPermissions: []*ec2.IpPermission{
×
1307
                        {
×
1308
                                FromPort:   aws.Int64(0),
×
1309
                                ToPort:     aws.Int64(65535),
×
1310
                                IpProtocol: aws.String("tcp"),
×
1311
                                IpRanges: []*ec2.IpRange{
×
1312
                                        {
×
1313
                                                // TODO: make this configurable?
×
1314
                                                CidrIp: aws.String("192.168.0.0/16"),
×
1315
                                        },
×
1316
                                },
×
1317
                        },
×
1318
                },
×
1319
        })
×
1320
        if err != nil {
×
1321
                Config.Logger.Print("Error adding ingress rule to security group: ", err)
×
1322
                return nil, err
×
1323
        }
×
1324

1325
        return sgId.GroupId, nil
×
1326
}
1327

1328
// Get latest amazonlinux ami
1329
func getLatestAmazonLinuxAmi(ec2svc *ec2.EC2) (*string, error) {
×
1330
        ami, err := ec2svc.DescribeImages(&ec2.DescribeImagesInput{
×
1331
                Filters: []*ec2.Filter{
×
1332
                        {
×
1333
                                Name: aws.String("name"),
×
1334
                                Values: []*string{
×
1335
                                        aws.String("amzn2-ami-ecs-hvm-2.0.*"),
×
1336
                                },
×
1337
                        },
×
1338
                        {
×
1339
                                Name:   aws.String("architecture"),
×
1340
                                Values: []*string{aws.String("x86_64")},
×
1341
                        },
×
1342
                },
×
1343
                Owners: []*string{
×
1344
                        aws.String("amazon"),
×
1345
                },
×
1346
        })
×
1347
        if err != nil {
×
1348
                Config.Logger.Print("Error getting latest amazonlinux AMI: ", err)
×
1349
                return nil, err
×
1350
        }
×
1351

1352
        if len(ami.Images) > 0 {
×
1353
                latestImage := ami.Images[0]
×
1354
                latestTimeStamp := time.Unix(0, 0).UTC()
×
1355

×
1356
                for _, image := range ami.Images {
×
1357

×
1358
                        creationTimeStamp, _ := time.Parse(time.RFC3339, *image.CreationDate)
×
1359

×
1360
                        if creationTimeStamp.After(latestTimeStamp) {
×
1361
                                latestTimeStamp = creationTimeStamp
×
1362
                                latestImage = image
×
1363
                        }
×
1364

1365
                }
1366

1367
                Config.Logger.Printf("Info: Found latest amazonlinux AMI: '%s'", *latestImage.ImageId)
×
1368
                return latestImage.ImageId, nil
×
1369
        }
NEW
1370
        return nil, errors.New("no amazonlinux AMI found")
×
1371
}
1372

1373
func getNextflowInstanceAmi(imageBuilderReaderRoleArn string, nextflowConfig NextflowConfig, imagebuilderListImagePipelineImages func(*imagebuilder.ListImagePipelineImagesInput) (*imagebuilder.ListImagePipelineImagesOutput, error)) (string, error) {
4✔
1374
        // the `imagebuilderListImagePipelineImages` parameter should not be provided in production. It allows
4✔
1375
        // us to test this function by mocking `imagebuilder.ListImagePipelineImages` in the tests.
4✔
1376
        var err error
4✔
1377
        ami := nextflowConfig.InstanceAmi
4✔
1378
        if ami != "" {
6✔
1379
                Config.Logger.Printf("Using configured 'nextflow.instance-ami' '%s' and ignoring 'nextflow.instance-ami-builder-arn'", ami)
2✔
1380
        } else if nextflowConfig.InstanceAmiBuilderArn != "" {
5✔
1381
                ami, err = getLatestImageBuilderAmi(imageBuilderReaderRoleArn, nextflowConfig.InstanceAmiBuilderArn, imagebuilderListImagePipelineImages)
1✔
1382
                if err != nil {
1✔
NEW
1383
                        return "", err
×
NEW
1384
                }
×
1385
        } else {
1✔
1386
                return "", fmt.Errorf("one of 'nextflow.instance-ami' and 'nextflow.instance-ami-builder-arn' must be configured")
1✔
1387
        }
1✔
1388
        return ami, err
3✔
1389
}
1390

1391
// delete the AWS resources created to launch nextflow workflows
1392
func cleanUpNextflowResources(userName string) error {
7✔
1393
        payModel, err := getCurrentPayModel(userName)
7✔
1394
        if err != nil {
7✔
1395
                return err
×
1396
        }
×
1397

1398
        // credentials and AWS services init
1399
        sess := session.Must(session.NewSession(&aws.Config{
7✔
1400
                Region: aws.String("us-east-1"),
7✔
1401
        }))
7✔
1402
        awsAccountId, awsConfig, err := getNextflowAwsSettings(sess, payModel, userName, "deleting")
7✔
1403
        if err != nil {
11✔
1404
                return err
4✔
1405
        }
4✔
1406
        Config.Logger.Printf("Debug: AWS account ID: '%v'", awsAccountId)
3✔
1407
        iamSvc := iam.New(sess, &awsConfig)
3✔
1408
        ec2Svc := ec2.New(sess, &awsConfig)
3✔
1409

3✔
1410
        userName = escapism(userName)
3✔
1411
        hostname := strings.ReplaceAll(os.Getenv("GEN3_ENDPOINT"), ".", "-")
3✔
1412

3✔
1413
        // delete the user's access keys
3✔
1414
        nextflowUserName := fmt.Sprintf("%s-nf-%s", hostname, userName)
3✔
1415
        err = deleteUserAccessKeys(nextflowUserName, iamSvc)
3✔
1416
        if err != nil {
6✔
1417
                Config.Logger.Printf("Unable to delete access keys for user '%s': %v", nextflowUserName, err)
3✔
1418
                return err
3✔
1419
        }
3✔
1420

1421
        err = stopSquidInstance(hostname, userName, ec2Svc)
×
1422
        if err != nil {
×
1423
                Config.Logger.Printf("Warning: Unable to stop Squid instance - continuing: %v", err)
×
1424
        }
×
1425

1426
        // NOTE: This was disabled because researchers may need to keep the intermediary files. Instead of
1427
        // deleting, we could set bucket lifecycle rules to delete after X days.
1428
        // NOTE: The code below works locally but not once deployed
1429

1430
        // bucketName = xyz...
1431
        // // delete the user's folder and its contents in the nextflow bucket
1432
        // objectsKey := fmt.Sprintf("%s/", userName)
1433
        // // objectsIter := s3manager.NewDeleteListIterator(s3Svc, &s3.ListObjectsInput{
1434
        // //         Bucket: &bucketName,
1435
        // //         Prefix: &objectsKey,
1436
        // // })
1437
        // objectsIter := s3manager.NewDeleteListIterator(s3Svc, &s3.ListObjectsInput{
1438
        //         Bucket: aws.String("xxx-nf"),
1439
        //         Prefix: aws.String("xxx-40uchicago-2eedu/"),
1440
        // })
1441
        // if err := s3manager.NewBatchDeleteWithClient(s3Svc).Delete(context.Background(), objectsIter); err != nil {
1442
        //         Config.Logger.Printf("Unable to delete objects in bucket '%s' at '%s' - continuing: %v", bucketName, objectsKey, err)
1443
        // } else {
1444
        //         Config.Logger.Printf("Debug: Deleted objects in bucket '%s' at '%s'", bucketName, objectsKey)
1445
        // }
1446

1447
        return nil
×
1448
}
1449

1450
func deleteUserAccessKeys(nextflowUserName string, iamSvc *iam.IAM) error {
3✔
1451
        listAccessKeysResult, err := iamSvc.ListAccessKeys(&iam.ListAccessKeysInput{
3✔
1452
                UserName: &nextflowUserName,
3✔
1453
        })
3✔
1454
        if err != nil {
6✔
1455
                Config.Logger.Printf("Unable to list access keys for user '%s': %v", nextflowUserName, err)
3✔
1456
                return err
3✔
1457
        }
3✔
1458
        for _, key := range listAccessKeysResult.AccessKeyMetadata {
×
1459
                Config.Logger.Printf("Deleting access key '%s' for user '%s'", *key.AccessKeyId, nextflowUserName)
×
1460
                _, err := iamSvc.DeleteAccessKey(&iam.DeleteAccessKeyInput{
×
1461
                        UserName:    &nextflowUserName,
×
1462
                        AccessKeyId: key.AccessKeyId,
×
1463
                })
×
1464
                if err != nil {
×
1465
                        Config.Logger.Printf("Warning: Unable to delete access key '%s' for user '%s' - continuing: %v", *key.AccessKeyId, nextflowUserName, err)
×
1466
                }
×
1467
        }
1468
        Config.Logger.Printf("Debug: Deleted all access keys for Nextflow AWS user '%s'", nextflowUserName)
×
1469
        return nil
×
1470
}
1471

1472
func stopSquidInstance(hostname string, userName string, ec2svc *ec2.EC2) error {
×
1473
        // check if instance already exists, if it does stop it and return
×
1474
        exinstance, err := ec2svc.DescribeInstances(&ec2.DescribeInstancesInput{
×
1475
                Filters: []*ec2.Filter{
×
1476
                        {
×
1477
                                Name:   aws.String("instance-state-name"),
×
1478
                                Values: []*string{aws.String("stopped"), aws.String("stopping"), aws.String("running"), aws.String("pending")},
×
1479
                        },
×
1480
                        {
×
1481
                                Name:   aws.String("tag:Name"),
×
1482
                                Values: []*string{aws.String(fmt.Sprintf("%s-nf-squid-%s", hostname, userName))},
×
1483
                        },
×
1484
                        {
×
1485
                                Name:   aws.String("tag:Environment"),
×
1486
                                Values: []*string{aws.String(os.Getenv("GEN3_ENDPOINT"))},
×
1487
                        },
×
1488
                },
×
1489
        })
×
1490
        if err != nil {
×
1491
                return err
×
1492
        }
×
1493
        if len(exinstance.Reservations) > 0 {
×
1494
                // Make sure the instance is stopped
×
1495
                if *exinstance.Reservations[0].Instances[0].State.Name == "stopped" {
×
1496
                        Config.Logger.Print("Debug: Squid instance already stopped, skipping")
×
1497
                        return nil
×
1498
                }
×
1499

1500
                // Terminate the instance
1501
                Config.Logger.Print("Debug: running Squid instance found, terminating it now")
×
1502
                _, err := ec2svc.TerminateInstances(&ec2.TerminateInstancesInput{
×
1503
                        InstanceIds: []*string{
×
1504
                                exinstance.Reservations[0].Instances[0].InstanceId,
×
1505
                        },
×
1506
                })
×
1507
                if err != nil {
×
1508
                        return err
×
1509
                }
×
1510
        }
1511
        return nil
×
1512
}
1513

1514
var generateNextflowConfig = func(userName string) (string, error) {
×
1515
        sess := session.Must(session.NewSession(&aws.Config{
×
1516
                Region: aws.String("us-east-1"),
×
1517
        }))
×
1518
        payModel, err := getCurrentPayModel(userName)
×
1519
        if err != nil {
×
1520
                return "", err
×
1521
        }
×
1522
        awsAccountId, awsConfig, err := getNextflowAwsSettings(sess, payModel, userName, "fetching")
×
1523
        if err != nil {
×
1524
                return "", err
×
1525
        }
×
1526

1527
        // get the queue name
1528
        userName = escapism(userName)
×
1529
        hostname := strings.ReplaceAll(os.Getenv("GEN3_ENDPOINT"), ".", "-")
×
1530
        batchJobQueueName := fmt.Sprintf("%s-nf-job-queue-%s", hostname, userName)
×
1531

×
1532
        // get the work dir
×
1533
        bucketName := fmt.Sprintf("%s-nf-%s", hostname, awsAccountId)
×
1534
        workDir := fmt.Sprintf("s3://%s/%s", bucketName, userName)
×
1535

×
1536
        // get the jobs role
×
1537
        tag := fmt.Sprintf("%s-hatchery-nf-%s", hostname, userName)
×
1538
        pathPrefix := aws.String(fmt.Sprintf("/%s/", tag))
×
1539
        iamSvc := iam.New(sess, &awsConfig)
×
1540
        listRolesResult, err := iamSvc.ListRoles(&iam.ListRolesInput{
×
1541
                PathPrefix: pathPrefix,
×
1542
        })
×
1543
        if err != nil || len(listRolesResult.Roles) == 0 {
×
1544
                Config.Logger.Printf("Error getting role with path prefix '%s', which should already exist: %v", *pathPrefix, err)
×
1545
                return "", err
×
1546
        }
×
1547
        nextflowJobsRoleArn := *listRolesResult.Roles[0].Arn
×
1548

×
1549
        Config.Logger.Printf("Generating Nextflow configuration with: Batch queue: '%s'. Job role: '%s'. Workdir: '%s'.", batchJobQueueName, nextflowJobsRoleArn, workDir)
×
1550

×
1551
        // TODO "ubuntu" container may not always be authorized - replace with a public approved container?
×
1552
        configContents := fmt.Sprintf(
×
1553
                `plugins {
×
1554
        id 'nf-amazon'
×
1555
}
×
1556
process {
×
1557
        executor = 'awsbatch'
×
1558
        queue = '%s'
×
1559
        container = 'ubuntu'
×
1560
}
×
1561
aws {
×
1562
        batch {
×
1563
                cliPath = '/home/ec2-user/miniconda/bin/aws'
×
1564
                jobRole = '%s'
×
1565
        }
×
1566
}
×
1567
workDir = '%s'`,
×
1568
                batchJobQueueName,
×
1569
                nextflowJobsRoleArn,
×
1570
                workDir,
×
1571
        )
×
1572

×
UNCOV
1573
        return configContents, nil
×
1574
}
1575

1576
func replaceAllUsernamePlaceholders(strArray []string, userName string) []string {
2✔
1577
        var result []string
2✔
1578
        for _, str := range strArray {
8✔
1579
                result = append(result, strings.Replace(str, "{{username}}", userName, -1))
6✔
1580
        }
6✔
1581
        return result
2✔
1582
}
1583

1584
// function to generate user data
1585
func generateEcrLoginUserData(jobImageWhitelist []string, userName string) string {
1✔
1586
        var ecrRepos []string
1✔
1587
        for _, image := range replaceAllUsernamePlaceholders(jobImageWhitelist, userName) {
5✔
1588
                if strings.Contains(image, ".ecr.") {
7✔
1589
                        // NOTE: on the ECR side, tags are ignored and users are allowed access to the whole repo.
3✔
1590
                        repo := strings.Split(image, ":")[0]
3✔
1591
                        ecrRepos = append(ecrRepos, repo)
3✔
1592
                }
3✔
1593
        }
1594

1595
        // TODO: read region from config
1596
        runCmd := ""
1✔
1597
        for _, approvedRepo := range ecrRepos {
4✔
1598
                runCmd += fmt.Sprintf(`
3✔
1599
- aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin %s`, approvedRepo)
3✔
1600
        }
3✔
1601

1602
        userData := fmt.Sprintf(`MIME-Version: 1.0
1✔
1603
Content-Type: multipart/mixed; boundary="==MYBOUNDARY=="
1✔
1604

1✔
1605
--==MYBOUNDARY==
1✔
1606
Content-Type: text/cloud-config; charset="us-ascii"
1✔
1607

1✔
1608
packages:
1✔
1609
- aws-cli
1✔
1610
runcmd:%s
1✔
1611
--==MYBOUNDARY==--`, runCmd)
1✔
1612

1✔
1613
        return base64.StdEncoding.EncodeToString([]byte(userData))
1✔
1614
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc