• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / hatchery / 8299721104

15 Mar 2024 04:56PM UTC coverage: 19.95% (+0.5%) from 19.474%
8299721104

Pull #96

github

paulineribeyre
master - resolve conflicts
Pull Request #96: MIDRC-543 Nextflow image builder AMI

45 of 73 new or added lines in 3 files covered. (61.64%)

1 existing line in 1 file now uncovered.

1349 of 6762 relevant lines covered (19.95%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.74
/hatchery/hatchery.go
1
package hatchery
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "html"
8
        "io"
9
        "net/http"
10
        "strconv"
11
        "strings"
12
        "text/template"
13
        "time"
14

15
        "github.com/aws/aws-sdk-go/aws"
16
        "github.com/aws/aws-sdk-go/aws/session"
17
        httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
18
        k8sv1 "k8s.io/api/core/v1"
19
)
20

21
// Config package-global shared hatchery config
22
var Config *FullHatcheryConfig
23

24
type containerOption struct {
25
        Name          string `json:"name"`
26
        CPULimit      string `json:"cpu-limit"`
27
        MemoryLimit   string `json:"memory-limit"`
28
        ID            string `json:"id"`
29
        IdleTimeLimit int    `json:"idle-time-limit"`
30
}
31

32
type TextOutput struct {
33
        Text string
34
}
35

36
var textResult = template.Must(template.New("").Parse(`{{ .Text }}`))
37

38
// RegisterHatchery setup endpoints with the http engine
39
func RegisterHatchery(mux *httptrace.ServeMux) {
×
40
        mux.HandleFunc("/", home)
×
41
        mux.HandleFunc("/launch", launch)
×
42
        mux.HandleFunc("/terminate", terminate)
×
43
        mux.HandleFunc("/status", status)
×
44
        mux.HandleFunc("/options", options)
×
45
        mux.HandleFunc("/mount-files", mountFiles)
×
46
        mux.HandleFunc("/paymodels", paymodels)
×
47
        mux.HandleFunc("/setpaymodel", setpaymodel)
×
48
        mux.HandleFunc("/resetpaymodels", resetPaymodels)
×
49
        mux.HandleFunc("/allpaymodels", allpaymodels)
×
50

×
51
        // ECS functions
×
52
        mux.HandleFunc("/create-ecs-cluster", createECSCluster)
×
53
}
×
54

55
func home(w http.ResponseWriter, r *http.Request) {
×
56
        htmlHeader := `<html>
×
57
        <head>Gen3 Hatchery</head>
×
58
        <body>`
×
59
        fmt.Fprintln(w, htmlHeader)
×
60

×
61
        for k, v := range Config.ContainersMap {
×
62
                fmt.Fprintf(w, "<h1><a href=\"%s/launch?hash=%s\">Launch %s - %s CPU - %s Memory</a></h1>", Config.Config.SubDir, k, v.Name, v.CPULimit, v.MemoryLimit)
×
63
        }
×
64

65
        htmlFooter := `</body>
×
66
        </html>`
×
67
        fmt.Fprintln(w, htmlFooter)
×
68
}
69

70
func getCurrentUserName(r *http.Request) (userName string) {
31✔
71
        user := r.Header.Get("REMOTE_USER")
31✔
72
        if user == "" {
42✔
73
                Config.Logger.Print("Warning: No username in header REMOTE_USER!")
11✔
74
        }
11✔
75

76
        // escape username to sanitize input from http header
77
        // this escapes characters which should not be in usernames anyway (<, >, &, ' and ")
78
        user = html.EscapeString(user)
31✔
79

31✔
80
        return user
31✔
81
}
82

83
var getWorkspaceStatus = func(ctx context.Context, userName string, accessToken string) (*WorkspaceStatus, error) {
4✔
84
        allpaymodels, err := getPayModelsForUser(userName)
4✔
85
        if err != nil {
4✔
86
                return nil, err
×
87
        }
×
88

89
        if allpaymodels == nil {
5✔
90
                return statusK8sPod(ctx, userName, accessToken, nil)
1✔
91
        }
1✔
92

93
        payModel := allpaymodels.CurrentPayModel
3✔
94
        if payModel != nil && payModel.Ecs {
4✔
95
                return statusEcs(ctx, userName, accessToken, payModel.AWSAccountId)
1✔
96
        } else {
3✔
97
                return statusK8sPod(ctx, userName, accessToken, payModel)
2✔
98
        }
2✔
99
}
100

101
func paymodels(w http.ResponseWriter, r *http.Request) {
×
102
        if r.Method != "GET" {
×
103
                http.Error(w, "Not Found", http.StatusNotFound)
×
104
                return
×
105
        }
×
106
        userName := getCurrentUserName(r)
×
107

×
108
        payModel, err := getCurrentPayModel(userName)
×
109
        if err != nil {
×
110
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
111
                return
×
112
        }
×
113
        if payModel == nil {
×
114
                http.Error(w, "Current paymodel not set", http.StatusNotFound)
×
115
                return
×
116
        }
×
117
        out, err := json.Marshal(payModel)
×
118
        if err != nil {
×
119
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
120
                return
×
121
        }
×
122
        fmt.Fprint(w, string(out))
×
123
}
124

125
func allpaymodels(w http.ResponseWriter, r *http.Request) {
×
126
        if r.Method != "GET" {
×
127
                http.Error(w, "Not Found", http.StatusNotFound)
×
128
                return
×
129
        }
×
130
        userName := getCurrentUserName(r)
×
131

×
132
        payModels, err := getPayModelsForUser(userName)
×
133
        if err != nil {
×
134
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
135
                return
×
136
        }
×
137
        if payModels == nil {
×
138
                http.Error(w, "No paymodel set", http.StatusNotFound)
×
139
                return
×
140
        }
×
141
        out, err := json.Marshal(payModels)
×
142
        if err != nil {
×
143
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
144
                return
×
145
        }
×
146
        fmt.Fprint(w, string(out))
×
147
}
148

149
func setpaymodel(w http.ResponseWriter, r *http.Request) {
4✔
150
        if r.Method != "POST" {
5✔
151
                http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
1✔
152
                return
1✔
153
        }
1✔
154
        userName := getCurrentUserName(r)
3✔
155
        id := r.URL.Query().Get("id")
3✔
156
        if id == "" {
4✔
157
                http.Error(w, "Missing ID argument", http.StatusBadRequest)
1✔
158
                return
1✔
159
        }
1✔
160

161
        currentStatus, err := getWorkspaceStatus(r.Context(), userName, getBearerToken(r))
2✔
162
        if err != nil {
2✔
163
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
164
                return
×
165
        }
×
166

167
        // Do not let users update status when a workpsace session is in progress
168
        if currentStatus.Status != "Not Found" {
3✔
169
                http.Error(w, "Can not update paymodel when workspace is running", http.StatusInternalServerError)
1✔
170
                return
1✔
171
        }
1✔
172

173
        pm, err := setCurrentPaymodel(userName, id)
1✔
174
        if err != nil {
1✔
175
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
176
                return
×
177
        }
×
178
        out, err := json.Marshal(pm)
1✔
179
        if err != nil {
1✔
180
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
181
                return
×
182
        }
×
183
        fmt.Fprint(w, string(out))
1✔
184
}
185

186
func status(w http.ResponseWriter, r *http.Request) {
×
187
        userName := getCurrentUserName(r)
×
188
        accessToken := getBearerToken(r)
×
189

×
190
        result, err := getWorkspaceStatus(r.Context(), userName, accessToken)
×
191
        if err != nil {
×
192
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
193
                return
×
194
        }
×
195

196
        out, err := json.Marshal(result)
×
197
        if err != nil {
×
198
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
199
                return
×
200
        }
×
201

202
        fmt.Fprint(w, string(out))
×
203
}
204

205
func resetPaymodels(w http.ResponseWriter, r *http.Request) {
4✔
206
        if r.Method != "POST" {
5✔
207
                http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
1✔
208
                return
1✔
209
        }
1✔
210
        userName := getCurrentUserName(r)
3✔
211

3✔
212
        currentStatus, err := getWorkspaceStatus(r.Context(), userName, getBearerToken(r))
3✔
213
        if err != nil {
3✔
214
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
215
                return
×
216
        }
×
217

218
        // Do not let users update status when a workpsace session is in progress
219
        if currentStatus.Status != "Not Found" {
4✔
220
                http.Error(w, "Can not reset paymodels when workspace is running", http.StatusInternalServerError)
1✔
221
                return
1✔
222
        }
1✔
223

224
        err = resetCurrentPaymodel(userName)
2✔
225
        if err != nil {
3✔
226
                http.Error(w, err.Error(), http.StatusInternalServerError)
1✔
227
                return
1✔
228
        }
1✔
229

230
        fmt.Fprint(w, "Current Paymodel has been reset")
1✔
231
}
232

233
func getOptionOutputForContainer(containerId string, containerSettings Container) containerOption {
3✔
234
        c := containerOption{
3✔
235
                Name:        containerSettings.Name,
3✔
236
                CPULimit:    containerSettings.CPULimit,
3✔
237
                MemoryLimit: containerSettings.MemoryLimit,
3✔
238
                ID:          containerId,
3✔
239
        }
3✔
240
        c.IdleTimeLimit = -1
3✔
241
        for _, arg := range containerSettings.Args {
3✔
242
                if strings.Contains(arg, "shutdown_no_activity_timeout=") {
×
243
                        argSplit := strings.Split(arg, "=")
×
244
                        idleTimeLimit, err := strconv.Atoi(argSplit[len(argSplit)-1])
×
245
                        if err == nil {
×
246
                                c.IdleTimeLimit = idleTimeLimit * 1000
×
247
                        }
×
248
                        break
×
249
                }
250
        }
251

252
        return c
3✔
253
}
254

255
func options(w http.ResponseWriter, r *http.Request) {
3✔
256
        userName := getCurrentUserName(r)
3✔
257
        accessToken := getBearerToken(r)
3✔
258

3✔
259
        // handle `/options?id=abc` => return the specified option
3✔
260
        hash := r.URL.Query().Get("id")
3✔
261
        if hash != "" {
5✔
262
                containerSettings, ok := Config.ContainersMap[hash]
2✔
263
                if !ok {
2✔
264
                        http.Error(w, fmt.Sprintf("Invalid 'id' parameter '%s'", hash), http.StatusBadRequest)
×
265
                        return
×
266
                }
×
267
                allowed, err := isUserAuthorizedForContainer(userName, accessToken, Config.ContainersMap[hash])
2✔
268
                if err != nil {
2✔
269
                        Config.Logger.Printf("Unable to check if user is authorized to launch this container. Assuming unthorized. Details: %v", err)
×
270
                }
×
271
                if err != nil || !allowed {
3✔
272
                        // return the same as for an unknown id
1✔
273
                        http.Error(w, fmt.Sprintf("Invalid 'id' parameter '%s'", hash), http.StatusBadRequest)
1✔
274
                        return
1✔
275
                }
1✔
276

277
                out, err := json.Marshal(getOptionOutputForContainer(hash, containerSettings))
1✔
278
                if err != nil {
1✔
279
                        http.Error(w, err.Error(), http.StatusInternalServerError)
×
280
                        return
×
281
                }
×
282

283
                fmt.Fprint(w, string(out))
1✔
284
                return
1✔
285
        }
286

287
        // handle `/options` without `id` parameter => return all available options
288
        var options []containerOption
1✔
289
        for k, v := range Config.ContainersMap {
4✔
290
                // filter out workspace options that the user is not allowed to run
3✔
291
                allowed, err := isUserAuthorizedForContainer(userName, accessToken, v)
3✔
292
                if err != nil {
3✔
293
                        http.Error(w, err.Error(), http.StatusInternalServerError)
×
294
                        return
×
295
                }
×
296
                if !allowed {
4✔
297
                        continue // do not return containers that the user is not allowed to run
1✔
298
                }
299

300
                c := getOptionOutputForContainer(k, v)
2✔
301
                options = append(options, c)
2✔
302
        }
303

304
        out, err := json.Marshal(options)
1✔
305
        if err != nil {
1✔
306
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
307
                return
×
308
        }
×
309

310
        fmt.Fprint(w, string(out))
1✔
311
}
312

313
func getWorkspaceFlavor(container Container) string {
10✔
314
        if container.NextflowConfig.Enabled {
10✔
315
                return "nextflow"
×
316
        } else if container.License.Enabled {
10✔
317
                return container.License.WorkspaceFlavor
×
318
        } else if strings.Contains(strings.ToLower(container.Name), "jupyter") {
10✔
319
                return "jupyter"
×
320
        } else {
10✔
321
                return ""
10✔
322
        }
10✔
323
}
324

325
func launch(w http.ResponseWriter, r *http.Request) {
14✔
326
        if r.Method != "POST" {
15✔
327
                http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
1✔
328
                return
1✔
329
        }
1✔
330
        accessToken := getBearerToken(r)
13✔
331

13✔
332
        hash := r.URL.Query().Get("id")
13✔
333
        if hash == "" {
14✔
334
                http.Error(w, "Missing 'id' parameter", http.StatusBadRequest)
1✔
335
                return
1✔
336
        }
1✔
337
        _, ok := Config.ContainersMap[hash]
12✔
338
        if !ok {
12✔
339
                http.Error(w, fmt.Sprintf("Invalid 'id' parameter '%s'", hash), http.StatusBadRequest)
×
340
                return
×
341
        }
×
342

343
        userName := getCurrentUserName(r)
12✔
344
        if userName == "" {
13✔
345
                http.Error(w, "No username found. Launch forbidden", http.StatusBadRequest)
1✔
346
                return
1✔
347
        }
1✔
348

349
        allowed, err := isUserAuthorizedForContainer(userName, accessToken, Config.ContainersMap[hash])
11✔
350
        if err != nil {
11✔
351
                Config.Logger.Printf("Unable to check if user is authorized to launch this container. Assuming unthorized. Details: %v", err)
×
352
        }
×
353
        if err != nil || !allowed {
12✔
354
                // return the same as for an unknown id
1✔
355
                http.Error(w, fmt.Sprintf("Invalid 'id' parameter '%s'", hash), http.StatusBadRequest)
1✔
356
                return
1✔
357
        }
1✔
358

359
        var envVars []k8sv1.EnvVar
10✔
360
        var envVarsEcs []EnvVar
10✔
361

10✔
362
        workspaceFlavor := getWorkspaceFlavor(Config.ContainersMap[hash])
10✔
363
        envVars = append(
10✔
364
                envVars,
10✔
365
                k8sv1.EnvVar{
10✔
366
                        Name:  "WORKSPACE_FLAVOR",
10✔
367
                        Value: workspaceFlavor,
10✔
368
                },
10✔
369
        )
10✔
370
        envVarsEcs = append(
10✔
371
                envVarsEcs,
10✔
372
                EnvVar{
10✔
373
                        Key:   "WORKSPACE_FLAVOR",
10✔
374
                        Value: workspaceFlavor,
10✔
375
                },
10✔
376
        )
10✔
377

10✔
378
        if Config.ContainersMap[hash].NextflowConfig.Enabled {
10✔
379
                Config.Logger.Printf("Info: Nextflow is enabled: creating Nextflow resources in AWS...")
×
NEW
380
                nextflowKeyId, nextflowKeySecret, err := createNextflowResources(userName, Config.Config.NextflowGlobalConfig, Config.ContainersMap[hash].NextflowConfig)
×
381
                if err != nil {
×
382
                        Config.Logger.Printf("Error creating Nextflow AWS resources in AWS for user '%s': %v", userName, err)
×
383
                        http.Error(w, "Unable to create AWS resources for Nextflow", http.StatusInternalServerError)
×
384
                        return
×
385
                }
×
386
                envVars = append(
×
387
                        envVars,
×
388
                        k8sv1.EnvVar{
×
389
                                Name:  "AWS_ACCESS_KEY_ID",
×
390
                                Value: nextflowKeyId,
×
391
                        },
×
392
                        k8sv1.EnvVar{
×
393
                                Name:  "AWS_SECRET_ACCESS_KEY",
×
394
                                Value: nextflowKeySecret,
×
395
                        },
×
396
                )
×
397
                envVarsEcs = append(
×
398
                        envVarsEcs,
×
399
                        EnvVar{
×
400
                                Key:   "AWS_ACCESS_KEY_ID",
×
401
                                Value: nextflowKeyId,
×
402
                        },
×
403
                        EnvVar{
×
404
                                Key:   "AWS_SECRET_ACCESS_KEY",
×
405
                                Value: nextflowKeySecret,
×
406
                        },
×
407
                )
×
408
                // TODO do we need to set AWS_DEFAULT_REGION too?
409
        } else {
10✔
410
                Config.Logger.Printf("Debug: Nextflow is not enabled: skipping Nextflow resources creation")
10✔
411
        }
10✔
412

413
        if Config.ContainersMap[hash].License.Enabled {
10✔
414
                Config.Logger.Printf(
×
415
                        "Info: Running licensed workspace: %s", Config.ContainersMap[hash].License.WorkspaceFlavor)
×
416
                dbconfig := initializeDbConfig()
×
417
                activeGen3LicenseUsers, err := getActiveGen3LicenseUserMaps(dbconfig, Config.ContainersMap[hash])
×
418
                if err != nil {
×
419
                        Config.Logger.Printf(err.Error())
×
420
                }
×
421
                // Check for config max
422
                nextLicenseId := getNextLicenseId(activeGen3LicenseUsers, Config.ContainersMap[hash].License.MaxLicenseIds)
×
423
                if nextLicenseId == 0 {
×
424
                        Config.Logger.Printf("Error: no available license ids")
×
425
                        return
×
426
                }
×
427
                newItem, err := createGen3LicenseUserMap(dbconfig, userName, nextLicenseId, Config.ContainersMap[hash])
×
428
                if err != nil {
×
429
                        Config.Logger.Printf(err.Error())
×
430
                }
×
431
                Config.Logger.Printf("Created new license-user-map item: %v", newItem)
×
432

433
        }
434

435
        allpaymodels, err := getPayModelsForUser(userName)
10✔
436
        if err != nil {
10✔
437
                Config.Logger.Printf(err.Error())
×
438
        }
×
439
        if allpaymodels == nil { // Commons with no concept of paymodels
14✔
440
                err = createLocalK8sPod(r.Context(), hash, userName, accessToken, envVars)
4✔
441
        } else {
10✔
442
                payModel := allpaymodels.CurrentPayModel
6✔
443
                if payModel == nil {
7✔
444
                        Config.Logger.Printf("Current Paymodel is not set. Launch forbidden for user %s", userName)
1✔
445
                        http.Error(w, "Current Paymodel is not set. Launch forbidden", http.StatusInternalServerError)
1✔
446
                        return
1✔
447
                } else if payModel.Local {
7✔
448
                        err = createLocalK8sPod(r.Context(), hash, userName, accessToken, envVars)
1✔
449
                } else if payModel.Ecs {
7✔
450

2✔
451
                        if payModel.Status != "active" {
3✔
452
                                // send 500 response.
1✔
453
                                // TODO: 403 is the correct code, but it triggers a 302 to the default 403 page in revproxy instead of showing error message.
1✔
454
                                Config.Logger.Printf("Paymodel is not active. Launch forbidden for user %s", userName)
1✔
455
                                http.Error(w, "Paymodel is not active. Launch forbidden", http.StatusInternalServerError)
1✔
456
                                return
1✔
457
                        }
1✔
458

459
                        Config.Logger.Printf("Launching ECS workspace for user %s", userName)
1✔
460
                        // Sending a 200 response straight away, but starting the launch in a goroutine
1✔
461
                        // TODO: Do more sanity checks before returning 200.
1✔
462
                        w.WriteHeader(http.StatusOK)
1✔
463
                        go launchEcsWorkspaceWrapper(userName, hash, accessToken, *payModel, envVarsEcs)
1✔
464
                        fmt.Fprintf(w, "Launch accepted")
1✔
465
                        return
1✔
466
                } else {
2✔
467
                        err = createExternalK8sPod(r.Context(), hash, userName, accessToken, *payModel, envVars)
2✔
468
                }
2✔
469
        }
470
        if err != nil {
9✔
471
                Config.Logger.Printf("error during launch: %-v", err)
2✔
472
                http.Error(w, err.Error(), http.StatusInternalServerError)
2✔
473
                return
2✔
474
        }
2✔
475
        fmt.Fprintf(w, "Success")
5✔
476
}
477

478
func terminate(w http.ResponseWriter, r *http.Request) {
9✔
479
        if r.Method != "POST" {
10✔
480
                http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
1✔
481
                return
1✔
482
        }
1✔
483
        accessToken := getBearerToken(r)
8✔
484
        userName := getCurrentUserName(r)
8✔
485
        if userName == "" {
9✔
486
                http.Error(w, "No username found. Unable to terminate", http.StatusBadRequest)
1✔
487
                return
1✔
488
        }
1✔
489
        Config.Logger.Printf("Terminating workspace for user %s", userName)
7✔
490

7✔
491
        // mark any gen3-licensed sessions as inactive
7✔
492
        Config.Logger.Printf("Checking for gen3 license items for user: %s", userName)
7✔
493
        dbconfig := initializeDbConfig()
7✔
494
        activeGen3LicenseUsers, userlicerr := getLicenseUserMapsForUser(dbconfig, userName)
7✔
495
        if userlicerr != nil {
7✔
496
                Config.Logger.Printf(userlicerr.Error())
×
497
        }
×
498
        Config.Logger.Printf("Debug: Active gen3 license user maps %v", activeGen3LicenseUsers)
7✔
499
        if len(*activeGen3LicenseUsers) == 0 {
14✔
500
                Config.Logger.Printf("No active gen3 license sessions for user: %s", userName)
7✔
501
        } else {
7✔
502
                for _, v := range *activeGen3LicenseUsers {
×
503
                        if v.UserId == userName {
×
504
                                Config.Logger.Printf("Debug: updating gen3 license user map as inactive for itemId %s", v.ItemId)
×
505
                                _, err := setGen3LicenseUserInactive(dbconfig, v.ItemId)
×
506
                                if err != nil {
×
507
                                        Config.Logger.Printf(err.Error())
×
508
                                }
×
509
                        }
510
                }
511
        }
512

513
        // delete nextflow resources. There is no way to know if the actual workspace being
514
        // terminated is a nextflow workspace or not, so always attempt to delete
515
        Config.Logger.Printf("Info: Deleting Nextflow resources in AWS...")
7✔
516
        err := cleanUpNextflowResources(userName)
7✔
517
        if err != nil {
14✔
518
                Config.Logger.Printf("Unable to delete AWS resources for Nextflow... continuing anyway")
7✔
519
        }
7✔
520

521
        payModel, err := getCurrentPayModel(userName)
7✔
522
        if err != nil {
7✔
523
                Config.Logger.Printf(err.Error())
×
524
        }
×
525
        if payModel != nil && payModel.Ecs {
10✔
526
                _, err = terminateEcsWorkspace(r.Context(), userName, accessToken, payModel.AWSAccountId)
3✔
527
                if err != nil {
4✔
528
                        http.Error(w, err.Error(), http.StatusInternalServerError)
1✔
529
                        return
1✔
530
                } else {
3✔
531
                        Config.Logger.Printf("Succesfully terminated all resources related to ECS workspace for user %s", userName)
2✔
532
                        fmt.Fprintf(w, "Terminated ECS workspace")
2✔
533
                }
2✔
534
        } else {
4✔
535
                err := deleteK8sPod(r.Context(), userName, accessToken, payModel)
4✔
536
                if err != nil {
5✔
537
                        http.Error(w, err.Error(), http.StatusInternalServerError)
1✔
538
                        return
1✔
539
                }
1✔
540
                Config.Logger.Printf("Terminated workspace for user %s", userName)
3✔
541
                fmt.Fprintf(w, "Terminated workspace")
3✔
542
        }
543

544
        // Need to reset pay model only after workspace termination is completed.
545
        go func() {
10✔
546
                // Periodically poll for status, until it is set as "Not Found"
5✔
547
                for {
12✔
548
                        status, err := getWorkspaceStatus(r.Context(), userName, accessToken)
7✔
549
                        if err != nil {
7✔
550
                                Config.Logger.Printf("error fetching workspace status for user %s\n err: %s", userName, err)
×
551
                        }
×
552
                        if status.Status == "Not Found" {
12✔
553
                                break
5✔
554
                        }
555
                        time.Sleep(5 * time.Second)
2✔
556
                }
557
                err = resetCurrentPaymodel(userName)
5✔
558
                if err != nil {
5✔
559
                        Config.Logger.Printf("unable to reset current paymodel for current user %s\nerr: %s", userName, err)
×
560
                }
×
561
        }()
562
}
563

564
func getBearerToken(r *http.Request) string {
29✔
565
        authHeader := r.Header.Get("Authorization")
29✔
566
        if authHeader == "" {
58✔
567
                return ""
29✔
568
        }
29✔
569
        s := strings.SplitN(authHeader, " ", 2)
×
570
        if len(s) == 2 && strings.ToLower(s[0]) == "bearer" {
×
571
                return s[1]
×
572
        }
×
573
        return ""
×
574
}
575

576
// ECS functions
577

578
// Function to create ECS cluster.
579
// TODO: NEED TO CALL THIS FUNCTION IF IT DOESN'T EXIST!!!
580
func createECSCluster(w http.ResponseWriter, r *http.Request) {
×
581
        userName := getCurrentUserName(r)
×
582
        payModel, err := getCurrentPayModel(userName)
×
583
        if payModel == nil {
×
584
                http.Error(w, "Paymodel has not been setup for user", http.StatusNotFound)
×
585
                return
×
586
        }
×
587
        if err != nil {
×
588
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
589
                return
×
590
        }
×
591
        roleARN := "arn:aws:iam::" + payModel.AWSAccountId + ":role/csoc_adminvm"
×
592
        sess := session.Must(session.NewSession(&aws.Config{
×
593
                // TODO: Make this configurable
×
594
                Region: aws.String("us-east-1"),
×
595
        }))
×
596
        svc := NewSVC(sess, roleARN)
×
597

×
598
        result, err := svc.launchEcsCluster(userName)
×
599
        var reader *strings.Reader
×
600
        if err != nil {
×
601
                reader = strings.NewReader(err.Error())
×
602
                Config.Logger.Printf("Error: %s", err)
×
603
        } else {
×
604
                reader = strings.NewReader(result.String())
×
605
        }
×
606
        _, err = io.Copy(w, reader)
×
607
        if err != nil {
×
608
                Config.Logger.Printf("Error: %s", err)
×
609
        }
×
610
}
611

612
// Function to check status of ECS workspace.
613
var statusEcs = func(ctx context.Context, userName string, accessToken string, awsAcctID string) (*WorkspaceStatus, error) {
×
614
        roleARN := "arn:aws:iam::" + awsAcctID + ":role/csoc_adminvm"
×
615
        sess := session.Must(session.NewSession(&aws.Config{
×
616
                // TODO: Make this configurable
×
617
                Region: aws.String("us-east-1"),
×
618
        }))
×
619
        svc := NewSVC(sess, roleARN)
×
620
        result, err := svc.statusEcsWorkspace(ctx, userName, accessToken)
×
621
        if err != nil {
×
622
                Config.Logger.Printf("Error: %s", err)
×
623
                return nil, err
×
624
        }
×
625
        return result, nil
×
626
}
627

628
// Wrapper function to launch ECS workspace in a goroutine.
629
// Terminates workspace if launch fails for whatever reason
630
var launchEcsWorkspaceWrapper = func(userName string, hash string, accessToken string, payModel PayModel, envVars []EnvVar) {
×
631
        err := launchEcsWorkspace(userName, hash, accessToken, payModel, envVars)
×
632
        if err != nil {
×
633
                Config.Logger.Printf("Error: %s", err)
×
634
                // Terminate ECS workspace if launch fails.
×
635
                _, err = terminateEcsWorkspace(context.Background(), userName, accessToken, payModel.AWSAccountId)
×
636
                if err != nil {
×
637
                        Config.Logger.Printf("Error: %s", err)
×
638
                }
×
639
        }
640
}
641

642
// The files returned by this endpoint are mounted to the `/data` dir by the `ecs-ws-sidecar`
643
func mountFiles(w http.ResponseWriter, r *http.Request) {
2✔
644
        userName := getCurrentUserName(r)
2✔
645
        if userName == "" {
2✔
646
                http.Error(w, "Please login", http.StatusUnauthorized)
×
647
                return
×
648
        }
×
649

650
        // handle `/mount-files?file_path=abc` => return file contents
651
        filePath := r.URL.Query().Get("file_path")
2✔
652
        if filePath != "" {
3✔
653
                out, err := getMountFileContents(filePath, userName)
1✔
654
                if err != nil {
1✔
655
                        http.Error(w, err.Error(), http.StatusBadRequest)
×
656
                        return
×
657
                }
×
658
                _ = textResult.Execute(w, TextOutput{string(out)})
1✔
659
                return
1✔
660
        }
661

662
        // handle `/mount-files` without `file_path` parameter => list the files
663
        type file struct {
1✔
664
                FilePath        string `json:"file_path"`
1✔
665
                WorkspaceFlavor string `json:"workspace_flavor"`
1✔
666
        }
1✔
667
        fileList := []file{}
1✔
668

1✔
669
        // Ideally we would only return this if the user is running a nextflow workspace. But we have
1✔
670
        // no way of knowing. Instead, set `WorkspaceFlavor=nextflow` and the sidecar will not mount
1✔
671
        // the file if env var `WORKSPACE_FLAVOR` is not `nextflow`.
1✔
672
        fileList = append(fileList, file{
1✔
673
                FilePath:        "sample-nextflow-config.txt",
1✔
674
                WorkspaceFlavor: "nextflow",
1✔
675
        })
1✔
676
        // Look for any `license` configs in containers
1✔
677
        for _, v := range Config.ContainersMap {
2✔
678
                if v.License.Enabled {
2✔
679
                        fileList = append(fileList, file{
1✔
680
                                FilePath:        v.License.FilePath,
1✔
681
                                WorkspaceFlavor: v.License.WorkspaceFlavor,
1✔
682
                        })
1✔
683
                }
1✔
684
        }
685

686
        out, err := json.Marshal(fileList)
1✔
687
        if err != nil {
1✔
688
                http.Error(w, err.Error(), http.StatusInternalServerError)
×
689
                return
×
690
        }
×
691

692
        fmt.Fprint(w, string(out))
1✔
693
}
694

695
func getMountFileContents(fileId string, userName string) (string, error) {
1✔
696
        filePathConfigs := getLicenceFilePathConfigs()
1✔
697

1✔
698
        if fileId == "sample-nextflow-config.txt" {
2✔
699
                out, err := generateNextflowConfig(userName)
1✔
700
                if err != nil {
1✔
701
                        Config.Logger.Printf("unable to generate Nextflow config: %v", err)
×
702
                }
×
703
                return out, nil
1✔
704
        } else if filePathInLicenseConfigs(fileId, filePathConfigs) {
×
705
                // get g3auto kube secret
×
706
                g3autoName, g3autoKey, ok := getG3autoInfoForFilepath(fileId, filePathConfigs)
×
707
                if !ok {
×
708
                        return "", fmt.Errorf("could not get g3auto name and key for file-path '%s'", fileId)
×
709
                }
×
710
                clientset, err := getKubeClientSet()
×
711
                if err != nil {
×
712
                        Config.Logger.Printf("unable to get kube client set: %v", err)
×
713
                }
×
714
                out, err := getLicenseFromKubernetes(clientset, g3autoName, g3autoKey)
×
715
                if err != nil {
×
716
                        Config.Logger.Printf("unable to get license from kubernetes: %v", err)
×
717
                }
×
718
                return out, nil
×
719
        } else {
×
720
                return "", fmt.Errorf("unknown id '%s'", fileId)
×
721
        }
×
722
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc