• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

astronomer / astro-cli / 716c27ab-af54-4fbb-904c-9fa0a6da08dc

26 May 2026 02:32PM UTC coverage: 44.676% (+5.0%) from 39.653%
716c27ab-af54-4fbb-904c-9fa0a6da08dc

push

circleci

web-flow
Migrate CLI to v1 public API; retire v1beta1 and v1alpha1 (except IDE) (#2093)

1848 of 18362 new or added lines in 58 files covered. (10.06%)

925 existing lines in 15 files now uncovered.

24957 of 55862 relevant lines covered (44.68%)

7.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.5
/cloud/deployment/workerqueue/workerqueue.go
1
package workerqueue
2

3
import (
4
        "errors"
5
        "fmt"
6
        "io"
7
        "reflect"
8
        "sort"
9
        "strconv"
10
        "strings"
11

12
        "github.com/astronomer/astro-cli/astro-client-v1"
13
        "github.com/astronomer/astro-cli/cloud/deployment"
14
        "github.com/astronomer/astro-cli/pkg/ansi"
15
        "github.com/astronomer/astro-cli/pkg/input"
16
        "github.com/astronomer/astro-cli/pkg/printutil"
17
)
18

19
const (
20
        createAction       = "create"
21
        updateAction       = "update"
22
        defaultQueueName   = "default"
23
        podCPUErrorMessage = "pod_cpu in the request. It can only be used with KubernetesExecutor"
24
        podRAMErrorMessage = "pod_ram in the request. It can only be used with KubernetesExecutor"
25
)
26

27
var (
28
        errInvalidWorkerQueueOption  = errors.New("worker queue option is invalid")
29
        errCannotUpdateExistingQueue = errors.New("worker queue already exists")
30
        errCannotCreateNewQueue      = errors.New("worker queue does not exist")
31
        errInvalidNodePool           = errors.New("node pool selection failed")
32
        errInvalidAstroMachine       = errors.New("invalid astro machine selection failed")
33
        errQueueDoesNotExist         = errors.New("worker queue does not exist")
34
        errInvalidQueue              = errors.New("worker queue selection failed")
35
        errCannotDeleteDefaultQueue  = errors.New("default queue can not be deleted")
36
        ErrNotSupported              = errors.New("does not support")
37
        errNoUseWorkerQueues         = errors.New("don't use 'worker_queues' to update default queue with KubernetesExecutor, use 'default_task_pod_cpu' and 'default_task_pod_memory' instead")
38
        errNoWorkerQueues            = errors.New("no worker queues found for this deployment")
39
)
40

41
// CreateOrUpdate creates a new worker queue or updates an existing worker queue for a deployment.
42
func CreateOrUpdate(ws, deploymentID, deploymentName, name, action, workerType string, wQueueMin, wQueueMax, wQueueConcurrency int, force bool, astroV1Client astrov1.APIClient, out io.Writer) error { //nolint
36✔
43
        var (
36✔
44
                requestedDeployment                  astrov1.Deployment
36✔
45
                err                                  error
36✔
46
                errHelp, succeededAction, nodePoolID string
36✔
47
                workerMachine                        astrov1.WorkerMachine
36✔
48
                queueToCreateOrUpdate                astrov1.WorkerQueueRequest
36✔
49
                queueToCreateOrUpdateHybrid          astrov1.HybridWorkerQueueRequest
36✔
50
                listToCreate                         []astrov1.WorkerQueueRequest
36✔
51
                existingQueues                       []astrov1.WorkerQueue
36✔
52
                hybridListToCreate                   []astrov1.HybridWorkerQueueRequest
36✔
53
                defaultOptions                       astrov1.WorkerQueueOptions
36✔
54
        )
36✔
55
        // get or select the deployment
36✔
56
        requestedDeployment, err = deployment.GetDeployment(ws, deploymentID, deploymentName, true, nil, astroV1Client)
36✔
57
        if err != nil {
40✔
58
                return err
4✔
59
        }
4✔
60

61
        if requestedDeployment.Id == "" {
33✔
62
                fmt.Printf("%s %s\n", deployment.NoDeploymentInWSMsg, ansi.Bold(ws))
1✔
63
                return nil
1✔
64
        }
1✔
65

66
        getDeploymentOptions := astrov1.GetDeploymentOptionsParams{
31✔
67
                DeploymentId: &requestedDeployment.Id,
31✔
68
        }
31✔
69
        deploymentOptions, err := deployment.GetPlatformDeploymentOptions("", getDeploymentOptions, astroV1Client)
31✔
70
        if err != nil {
34✔
71
                return err
3✔
72
        }
3✔
73
        defaultOptions = deploymentOptions.WorkerQueues
28✔
74

28✔
75
        if deployment.IsDeploymentStandard(*requestedDeployment.Type) || deployment.IsDeploymentDedicated(*requestedDeployment.Type) {
34✔
76
                // create listToCreate
6✔
77
                if requestedDeployment.WorkerQueues != nil {
12✔
78
                        queues := *requestedDeployment.WorkerQueues
6✔
79
                        for i := range *requestedDeployment.WorkerQueues {
9✔
80
                                existingQueueRequest := astrov1.WorkerQueueRequest{
3✔
81
                                        Name:              queues[i].Name,
3✔
82
                                        Id:                &queues[i].Id,
3✔
83
                                        IsDefault:         queues[i].IsDefault,
3✔
84
                                        MaxWorkerCount:    queues[i].MaxWorkerCount,
3✔
85
                                        MinWorkerCount:    queues[i].MinWorkerCount,
3✔
86
                                        WorkerConcurrency: queues[i].WorkerConcurrency,
3✔
87
                                        AstroMachine:      astrov1.WorkerQueueRequestAstroMachine(*queues[i].AstroMachine),
3✔
88
                                }
3✔
89
                                listToCreate = append(listToCreate, existingQueueRequest)
3✔
90
                        }
3✔
91
                }
92
                if name == "" {
8✔
93
                        name, err = getQueueName(name, action, &requestedDeployment, out)
2✔
94
                        if err != nil {
2✔
95
                                return err
×
96
                        }
×
97
                }
98
                if action == updateAction && workerType == "" {
6✔
99
                        // get workerType
×
100
                        for i := range listToCreate {
×
101
                                if name == listToCreate[i].Name {
×
102
                                        workerType = string(listToCreate[i].AstroMachine)
×
103
                                }
×
104
                        }
105
                }
106

107
                WorkerMachines := deploymentOptions.WorkerMachines
6✔
108
                // get the machine to use
6✔
109
                workerMachine, err = selectWorkerMachine(workerType, WorkerMachines, out)
6✔
110
                if err != nil {
7✔
111
                        return err
1✔
112
                }
1✔
113

114
                if wQueueConcurrency == 0 && action == createAction {
5✔
115
                        wQueueConcurrency = int(workerMachine.Concurrency.Default) // This is set based on the machine type the user chooses if not explicitly passed by the user
×
116
                }
×
117
                queueToCreateOrUpdate = astrov1.WorkerQueueRequest{
5✔
118
                        Name:              name,
5✔
119
                        IsDefault:         false, // cannot create a default queue
5✔
120
                        AstroMachine:      astrov1.WorkerQueueRequestAstroMachine(workerMachine.Name),
5✔
121
                        MinWorkerCount:    wQueueMin,         // use the value from the user input
5✔
122
                        MaxWorkerCount:    wQueueMax,         // use the value from the user input
5✔
123
                        WorkerConcurrency: wQueueConcurrency, // use the value from the user input
5✔
124
                }
5✔
125
                queueToCreateOrUpdate = SetWorkerQueueValues(wQueueMin, wQueueMax, wQueueConcurrency, queueToCreateOrUpdate, defaultOptions, &workerMachine)
5✔
126
        } else {
22✔
127
                // get the node poolID to use
22✔
128
                cluster, err := deployment.GetClusterByID("", *requestedDeployment.ClusterId, astroV1Client)
22✔
129
                if err != nil {
22✔
130
                        return err
×
131
                }
×
132
                nodePoolID, err = selectNodePool(workerType, *cluster.NodePools, out)
22✔
133
                if err != nil {
24✔
134
                        return err
2✔
135
                }
2✔
136
                queueToCreateOrUpdateHybrid = astrov1.HybridWorkerQueueRequest{
20✔
137
                        Name:              name,
20✔
138
                        IsDefault:         false, // cannot create a default queue
20✔
139
                        NodePoolId:        nodePoolID,
20✔
140
                        MinWorkerCount:    wQueueMin,         // use the value from the user input
20✔
141
                        MaxWorkerCount:    wQueueMax,         // use the value from the user input
20✔
142
                        WorkerConcurrency: wQueueConcurrency, // use the value from the user input
20✔
143
                }
20✔
144
                // create hybridListToCreate
20✔
145
                queues := *requestedDeployment.WorkerQueues
20✔
146
                for i := range *requestedDeployment.WorkerQueues {
36✔
147
                        existingHybridQueueRequest := astrov1.HybridWorkerQueueRequest{
16✔
148
                                Name:              queues[i].Name,
16✔
149
                                Id:                &queues[i].Id,
16✔
150
                                IsDefault:         queues[i].IsDefault,
16✔
151
                                MaxWorkerCount:    queues[i].MaxWorkerCount,
16✔
152
                                MinWorkerCount:    queues[i].MinWorkerCount,
16✔
153
                                WorkerConcurrency: queues[i].WorkerConcurrency,
16✔
154
                                NodePoolId:        *queues[i].NodePoolId,
16✔
155
                        }
16✔
156
                        hybridListToCreate = append(hybridListToCreate, existingHybridQueueRequest)
16✔
157
                }
16✔
158
                if name == "" {
27✔
159
                        queueToCreateOrUpdateHybrid.Name, err = getQueueName(name, action, &requestedDeployment, out)
7✔
160
                        if err != nil {
8✔
161
                                return err
1✔
162
                        }
1✔
163
                        name = queueToCreateOrUpdateHybrid.Name
6✔
164
                }
165
                queueToCreateOrUpdateHybrid = SetWorkerQueueValuesHybrid(wQueueMin, wQueueMax, wQueueConcurrency, queueToCreateOrUpdateHybrid, defaultOptions)
19✔
166
        }
167
        switch *requestedDeployment.Executor {
24✔
168
        case astrov1.DeploymentExecutorCELERY, astrov1.DeploymentExecutorASTRO:
18✔
169
                if deployment.IsDeploymentStandard(*requestedDeployment.Type) || deployment.IsDeploymentDedicated(*requestedDeployment.Type) {
23✔
170
                        err = IsHostedWorkerQueueInputValid(queueToCreateOrUpdate, defaultOptions, &workerMachine)
5✔
171
                        if err != nil {
6✔
172
                                return err
1✔
173
                        }
1✔
174
                } else {
13✔
175
                        err = IsWorkerQueueInputValid(queueToCreateOrUpdateHybrid, defaultOptions)
13✔
176
                        if err != nil {
15✔
177
                                return err
2✔
178
                        }
2✔
179
                }
180
        case astrov1.DeploymentExecutorKUBERNETES:
6✔
181
                // worker queues are only used with the kubernetes execuor for hybrid deployments
6✔
182
                if deployment.IsDeploymentStandard(*requestedDeployment.Type) || deployment.IsDeploymentDedicated(*requestedDeployment.Type) {
6✔
183
                        return errNoUseWorkerQueues
×
184
                }
×
185
                // -1 is the CLI default to allow users to request wQueueMin=0. Here we set it to default because MinWorkerCount is not used in Kubernetes Deployments
186
                queueToCreateOrUpdateHybrid.MinWorkerCount = -1
6✔
187
                err = IsKubernetesWorkerQueueInputValid(queueToCreateOrUpdateHybrid)
6✔
188
                if err != nil {
9✔
189
                        return err
3✔
190
                }
3✔
191
        }
192

193
        // sanitize all the existing queues based on executor
194
        existingQueues = sanitizeExistingQueues(*requestedDeployment.WorkerQueues, *requestedDeployment.Executor)
18✔
195
        // create listToCreate
18✔
196
        switch action {
18✔
197
        case createAction:
8✔
198
                if QueueExists(existingQueues, queueToCreateOrUpdate, queueToCreateOrUpdateHybrid) {
10✔
199
                        // create does not allow updating existing queues
2✔
200
                        errHelp = fmt.Sprintf("use worker queue update %s instead", name)
2✔
201
                        return fmt.Errorf("%w: %s", errCannotUpdateExistingQueue, errHelp)
2✔
202
                }
2✔
203
                // add the new queue to the list of worker queues
204
                listToCreate = append(listToCreate, queueToCreateOrUpdate) //nolint
6✔
205
                hybridListToCreate = append(hybridListToCreate, queueToCreateOrUpdateHybrid)
6✔
206
        case updateAction:
8✔
207
                if QueueExists(existingQueues, queueToCreateOrUpdate, queueToCreateOrUpdateHybrid) {
13✔
208
                        if !force {
6✔
209
                                i, _ := input.Confirm(
1✔
210
                                        fmt.Sprintf("\nAre you sure you want to %s the %s worker queue? If there are any tasks in your DAGs assigned to this worker queue, the tasks might get stuck in a queued state and fail to execute", action, ansi.Bold(name)))
1✔
211

1✔
212
                                if !i {
2✔
213
                                        fmt.Fprintf(out, "Canceling worker queue %s\n", action)
1✔
214
                                        return nil
1✔
215
                                }
1✔
216
                        }
217
                        // user requested an update and queueToCreateOrUpdate exists
218
                        listToCreate = updateQueueList(listToCreate, queueToCreateOrUpdate, requestedDeployment.Executor, wQueueMin, wQueueMax, wQueueConcurrency)
4✔
219
                        hybridListToCreate = updateHybridQueueList(hybridListToCreate, queueToCreateOrUpdateHybrid, requestedDeployment.Executor, wQueueMin, wQueueMax, wQueueConcurrency)
4✔
220
                } else {
3✔
221
                        // update does not allow creating new queues
3✔
222
                        if !reflect.DeepEqual(queueToCreateOrUpdate, astrov1.WorkerQueueRequest{}) {
4✔
223
                                errHelp = fmt.Sprintf("use worker queue create %s instead", queueToCreateOrUpdate.Name)
1✔
224
                        }
1✔
225
                        if !reflect.DeepEqual(queueToCreateOrUpdateHybrid, astrov1.HybridWorkerQueueRequest{}) {
5✔
226
                                errHelp = fmt.Sprintf("use worker queue create %s instead", queueToCreateOrUpdateHybrid.Name)
2✔
227
                        }
2✔
228
                        return fmt.Errorf("%w: %s", errCannotCreateNewQueue, errHelp)
3✔
229
                }
230
        }
231
        // update the deployment with the new list of worker queues
232
        err = deployment.Update(requestedDeployment.Id, "", ws, "", "", "", "", "", "", "", "", "", "", "", "", "", 0, 0, listToCreate, hybridListToCreate, []astrov1.DeploymentEnvironmentVariableRequest{}, nil, nil, nil, true, astroV1Client)
12✔
233
        if err != nil {
14✔
234
                return err
2✔
235
        }
2✔
236
        // change action to past tense
237
        succeededAction = fmt.Sprintf("%sd", action)
10✔
238

10✔
239
        fmt.Fprintf(out, "worker queue %s for %s in %s workspace %s\n", name, requestedDeployment.Name, ws, succeededAction)
10✔
240
        return nil
10✔
241
}
242

243
// SetWorkerQueueValues sets default values for MinWorkerCount, MaxWorkerCount and WorkerConcurrency if none were requested.
244
func SetWorkerQueueValues(wQueueMin, wQueueMax, wQueueConcurrency int, workerQueueToCreate astrov1.WorkerQueueRequest, workerQueueDefaultOptions astrov1.WorkerQueueOptions, machineOptions *astrov1.WorkerMachine) astrov1.WorkerQueueRequest {
11✔
245
        // -1 is the CLI default to allow users to request wQueueMin=0
11✔
246
        if wQueueMin == -1 {
16✔
247
                // set default value as user input did not have it
5✔
248
                workerQueueToCreate.MinWorkerCount = int(workerQueueDefaultOptions.MinWorkers.Default)
5✔
249
        }
5✔
250

251
        if wQueueMax == 0 {
13✔
252
                // set default value as user input did not have it
2✔
253
                workerQueueToCreate.MaxWorkerCount = int(workerQueueDefaultOptions.MaxWorkers.Default)
2✔
254
        }
2✔
255
        if wQueueConcurrency == 0 {
13✔
256
                // set default value as user input did not have it
2✔
257
                workerQueueToCreate.WorkerConcurrency = int(machineOptions.Concurrency.Default)
2✔
258
        }
2✔
259
        return workerQueueToCreate
11✔
260
}
261

262
// SetWorkerQueueValues sets default values for MinWorkerCount, MaxWorkerCount and WorkerConcurrency if none were requested.
263
func SetWorkerQueueValuesHybrid(wQueueMin, wQueueMax, wQueueConcurrency int, workerQueueToCreate astrov1.HybridWorkerQueueRequest, workerQueueDefaultOptions astrov1.WorkerQueueOptions) astrov1.HybridWorkerQueueRequest {
19✔
264
        // -1 is the CLI default to allow users to request wQueueMin=default
19✔
265
        if wQueueMin == -1 {
28✔
266
                // set default value as user input did not have it
9✔
267
                workerQueueToCreate.MinWorkerCount = int(workerQueueDefaultOptions.MinWorkers.Default)
9✔
268
        }
9✔
269
        if wQueueMax == 0 {
27✔
270
                // set default value as user input did not have it
8✔
271
                workerQueueToCreate.MaxWorkerCount = int(workerQueueDefaultOptions.MaxWorkers.Default)
8✔
272
        }
8✔
273
        if wQueueConcurrency == 0 {
27✔
274
                // set default value as user input did not have it
8✔
275
                workerQueueToCreate.WorkerConcurrency = int(workerQueueDefaultOptions.WorkerConcurrency.Default)
8✔
276
        }
8✔
277
        return workerQueueToCreate
19✔
278
}
279

280
// IsWorkerQueueInputValid checks if the requestedWorkerQueue adheres to the floor and ceiling set in the defaultOptions.
281
// if it adheres to them, it returns nil.
282
// errInvalidWorkerQueueOption is returned if min, max or concurrency are out of range.
283
// ErrNotSupported is returned if PodCPU or PodRAM are requested.
284
func IsWorkerQueueInputValid(requestedHybridWorkerQueue astrov1.HybridWorkerQueueRequest, defaultOptions astrov1.WorkerQueueOptions) error {
17✔
285
        var errorMessage string
17✔
286
        if !(requestedHybridWorkerQueue.MinWorkerCount >= int(defaultOptions.MinWorkers.Floor)) ||
17✔
287
                !(requestedHybridWorkerQueue.MinWorkerCount <= int(defaultOptions.MinWorkers.Ceiling)) {
20✔
288
                errorMessage = fmt.Sprintf("min worker count must be between %d and %d", int(defaultOptions.MinWorkers.Floor), int(defaultOptions.MinWorkers.Ceiling))
3✔
289
                return fmt.Errorf("%w: %s", errInvalidWorkerQueueOption, errorMessage)
3✔
290
        }
3✔
291
        if !(requestedHybridWorkerQueue.MaxWorkerCount >= int(defaultOptions.MaxWorkers.Floor)) ||
14✔
292
                !(requestedHybridWorkerQueue.MaxWorkerCount <= int(defaultOptions.MaxWorkers.Ceiling)) {
15✔
293
                errorMessage = fmt.Sprintf("max worker count must be between %d and %d", int(defaultOptions.MaxWorkers.Floor), int(defaultOptions.MaxWorkers.Ceiling))
1✔
294
                return fmt.Errorf("%w: %s", errInvalidWorkerQueueOption, errorMessage)
1✔
295
        }
1✔
296
        if !(requestedHybridWorkerQueue.WorkerConcurrency >= int(defaultOptions.WorkerConcurrency.Floor)) ||
13✔
297
                !(requestedHybridWorkerQueue.WorkerConcurrency <= int(defaultOptions.WorkerConcurrency.Ceiling)) {
14✔
298
                errorMessage = fmt.Sprintf("worker concurrency must be between %d and %d", int(defaultOptions.WorkerConcurrency.Floor), int(defaultOptions.WorkerConcurrency.Ceiling))
1✔
299
                return fmt.Errorf("%w: %s", errInvalidWorkerQueueOption, errorMessage)
1✔
300
        }
1✔
301
        return nil
12✔
302
}
303

304
// IsHostedWorkerQueueInputValid checks if the requestedWorkerQueue adheres to the floor and ceiling set in the defaultOptions and machineOptions.
305
// if it adheres to them, it returns nil.
306
// errInvalidWorkerQueueOption is returned if min, max or concurrency are out of range.
307
// ErrNotSupported is returned if PodCPU or PodRAM are requested.
308
func IsHostedWorkerQueueInputValid(requestedWorkerQueue astrov1.WorkerQueueRequest, defaultOptions astrov1.WorkerQueueOptions, machineOptions *astrov1.WorkerMachine) error {
9✔
309
        var errorMessage string
9✔
310
        if !(requestedWorkerQueue.MinWorkerCount >= int(defaultOptions.MinWorkers.Floor)) ||
9✔
311
                !(requestedWorkerQueue.MinWorkerCount <= int(defaultOptions.MinWorkers.Ceiling)) {
11✔
312
                errorMessage = fmt.Sprintf("min worker count must be between %d and %d", int(defaultOptions.MinWorkers.Floor), int(defaultOptions.MinWorkers.Ceiling))
2✔
313
                return fmt.Errorf("%w: %s", errInvalidWorkerQueueOption, errorMessage)
2✔
314
        }
2✔
315
        if !(requestedWorkerQueue.MaxWorkerCount >= int(defaultOptions.MaxWorkers.Floor)) ||
7✔
316
                !(requestedWorkerQueue.MaxWorkerCount <= int(defaultOptions.MaxWorkers.Ceiling)) {
8✔
317
                errorMessage = fmt.Sprintf("max worker count must be between %d and %d", int(defaultOptions.MaxWorkers.Floor), int(defaultOptions.MaxWorkers.Ceiling))
1✔
318
                return fmt.Errorf("%w: %s", errInvalidWorkerQueueOption, errorMessage)
1✔
319
        }
1✔
320
        // The floor for worker concurrency for hosted deployments is always 1 for all astro machines
321
        workerConcurrenyFloor := 1
6✔
322
        if !(requestedWorkerQueue.WorkerConcurrency >= workerConcurrenyFloor) ||
6✔
323
                !(requestedWorkerQueue.WorkerConcurrency <= int(machineOptions.Concurrency.Ceiling)) {
7✔
324
                errorMessage = fmt.Sprintf("worker concurrency must be between %d and %d", workerConcurrenyFloor, int(machineOptions.Concurrency.Ceiling))
1✔
325
                return fmt.Errorf("%w: %s", errInvalidWorkerQueueOption, errorMessage)
1✔
326
        }
1✔
327
        return nil
5✔
328
}
329

330
// IsKubernetesWorkerQueueInputValid checks if the requestedQueue has all the necessary properties
331
// required to create a worker queue for the KubernetesExecutor.
332
// errNotSupported is returned for any invalid properties.
333
func IsKubernetesWorkerQueueInputValid(queueToCreateOrUpdateHybrid astrov1.HybridWorkerQueueRequest) error {
10✔
334
        var errorMessage string
10✔
335

10✔
336
        if queueToCreateOrUpdateHybrid.Name != defaultQueueName {
14✔
337
                errorMessage = "a non default worker queue in the request. Rename the queue to default"
4✔
338
                return fmt.Errorf("%s %w %s", deployment.KubeExecutor, ErrNotSupported, errorMessage)
4✔
339
        }
4✔
340
        if queueToCreateOrUpdateHybrid.MaxWorkerCount != 0 {
7✔
341
                errorMessage = "maximum worker count in the request. It can only be used with CeleryExecutor"
1✔
342
                return fmt.Errorf("%s %w %s", deployment.KubeExecutor, ErrNotSupported, errorMessage)
1✔
343
        }
1✔
344
        if queueToCreateOrUpdateHybrid.WorkerConcurrency != 0 {
6✔
345
                errorMessage = "worker concurrency in the request. It can only be used with CeleryExecutor"
1✔
346
                return fmt.Errorf("%s %w %s", deployment.KubeExecutor, ErrNotSupported, errorMessage)
1✔
347
        }
1✔
348

349
        return nil
4✔
350
}
351

352
// QueueExists takes a []existingQueues and a queueToCreateOrUpdate as arguments
353
// It returns true if queueToCreateOrUpdate exists in []existingQueues
354
// It returns false if queueToCreateOrUpdate does not exist in []existingQueues
355
func QueueExists(existingQueues []astrov1.WorkerQueue, queueToCreateOrUpdate astrov1.WorkerQueueRequest, queueToCreateOrUpdateHybrid astrov1.HybridWorkerQueueRequest) bool {
28✔
356
        for _, queue := range existingQueues { //nolint
66✔
357
                if queue.Name == queueToCreateOrUpdateHybrid.Name {
53✔
358
                        // queueToCreateOrUpdate exists
15✔
359
                        return true
15✔
360
                }
15✔
361
                if queueToCreateOrUpdateHybrid.Id != nil {
27✔
362
                        if queue.Id == *queueToCreateOrUpdateHybrid.Id {
5✔
363
                                // queueToCreateOrUpdate exists
1✔
364
                                return true
1✔
365
                        }
1✔
366
                }
367
                if queue.Name == queueToCreateOrUpdate.Name {
22✔
368
                        // queueToCreateOrUpdate exists
×
369
                        return true
×
370
                }
×
371
                if queueToCreateOrUpdate.Id != nil {
25✔
372
                        if queue.Id == *queueToCreateOrUpdate.Id {
3✔
373
                                // queueToCreateOrUpdate exists
×
374
                                return true
×
375
                        }
×
376
                }
377
        }
378
        return false
12✔
379
}
380

381
func selectWorkerMachine(workerType string, workerMachines []astrov1.WorkerMachine, out io.Writer) (astrov1.WorkerMachine, error) {
6✔
382
        var (
6✔
383
                workerMachine astrov1.WorkerMachine
6✔
384
                errToReturn   error
6✔
385
        )
6✔
386

6✔
387
        switch workerType {
6✔
388
        case "":
3✔
389
                tab := printutil.Table{
3✔
390
                        Padding:        []int{5, 30, 20, 50},
3✔
391
                        DynamicPadding: true,
3✔
392
                        Header:         []string{"#", "WORKER TYPE", "CPU", "Memory"},
3✔
393
                }
3✔
394

3✔
395
                fmt.Println("No worker type was specified. Select the worker type to use")
3✔
396

3✔
397
                machineMap := map[string]astrov1.WorkerMachine{}
3✔
398

3✔
399
                for i := range workerMachines {
9✔
400
                        index := i + 1
6✔
401
                        tab.AddRow([]string{strconv.Itoa(index), string(workerMachines[i].Name), workerMachines[i].Spec.Cpu + " vCPU", workerMachines[i].Spec.Memory}, false)
6✔
402

6✔
403
                        machineMap[strconv.Itoa(index)] = workerMachines[i]
6✔
404
                }
6✔
405

406
                tab.Print(out)
3✔
407
                choice := input.Text("\n> ")
3✔
408
                selectedPool, ok := machineMap[choice]
3✔
409
                if !ok {
4✔
410
                        // returning an error as choice was not in nodePoolMap
1✔
411
                        errToReturn = fmt.Errorf("%w: invalid worker type: %s selected", errInvalidAstroMachine, choice)
1✔
412
                        return astrov1.WorkerMachine{}, errToReturn
1✔
413
                }
1✔
414
                return selectedPool, nil
2✔
415
        default:
3✔
416
                for _, workerMachine = range workerMachines {
6✔
417
                        if strings.EqualFold(string(workerMachine.Name), workerType) {
6✔
418
                                return workerMachine, nil
3✔
419
                        }
3✔
420
                }
421
                // did not find a matching workerType in any node pool
422
                errToReturn = fmt.Errorf("%w: workerType %s is not available for this deployment", errInvalidAstroMachine, workerType)
×
NEW
423
                return astrov1.WorkerMachine{}, errToReturn
×
424
        }
425
}
426

427
// selectNodePool takes workerType and []NodePool as arguments
428
// If user requested a workerType, then the matching nodePoolID is returned
429
// If user did not request a workerType, then it prompts the user to pick one
430
// An errInvalidNodePool is returned if a user chooses an option not on the list
431
func selectNodePool(workerType string, nodePools []astrov1.NodePool, out io.Writer) (string, error) {
26✔
432
        var (
26✔
433
                nodePoolID, message string
26✔
434
                errToReturn         error
26✔
435
        )
26✔
436

26✔
437
        message = "No worker type was specified. Select the worker type to use"
26✔
438
        switch workerType {
26✔
439
        case "":
3✔
440
                tab := printutil.Table{
3✔
441
                        Padding:        []int{5, 30, 20, 50},
3✔
442
                        DynamicPadding: true,
3✔
443
                        Header:         []string{"#", "WORKER TYPE", "ISDEFAULT", "ID"},
3✔
444
                }
3✔
445

3✔
446
                fmt.Println(message)
3✔
447

3✔
448
                sort.Slice(nodePools, func(i, j int) bool {
8✔
449
                        return nodePools[i].CreatedAt.Before(nodePools[j].CreatedAt)
5✔
450
                })
5✔
451

452
                nodePoolMap := map[string]astrov1.NodePool{}
3✔
453
                for i := range nodePools {
11✔
454
                        index := i + 1
8✔
455
                        tab.AddRow([]string{strconv.Itoa(index), nodePools[i].NodeInstanceType, strconv.FormatBool(nodePools[i].IsDefault), nodePools[i].Id}, false)
8✔
456

8✔
457
                        nodePoolMap[strconv.Itoa(index)] = nodePools[i]
8✔
458
                }
8✔
459

460
                tab.Print(out)
3✔
461
                choice := input.Text("\n> ")
3✔
462
                selectedPool, ok := nodePoolMap[choice]
3✔
463
                if !ok {
4✔
464
                        // returning an error as choice was not in nodePoolMap
1✔
465
                        errToReturn = fmt.Errorf("%w: invalid worker type: %s selected", errInvalidNodePool, choice)
1✔
466
                        return nodePoolID, errToReturn
1✔
467
                }
1✔
468
                return selectedPool.Id, nil
2✔
469
        default:
23✔
470
                // Get the nodePoolID for pool that matches workerType
23✔
471
                for i := range nodePools {
66✔
472
                        if nodePools[i].NodeInstanceType == workerType {
63✔
473
                                nodePoolID = nodePools[i].Id
20✔
474
                                return nodePoolID, errToReturn
20✔
475
                        }
20✔
476
                }
477
                // did not find a matching workerType in any node pool
478
                errToReturn = fmt.Errorf("%w: workerType %s is not available for this deployment", errInvalidNodePool, workerType)
3✔
479
                return nodePoolID, errToReturn
3✔
480
        }
481
}
482

483
// Delete deletes the specified worker queue from the deployment
484
// user gets prompted if no deployment was specified
485
// user gets prompted if no name for the queue to delete was specified
486
// An errQueueDoesNotExist is returned if queue to delete does not exist
487
// An errCannotDeleteDefaultQueue is returned if a user chooses the default queue
488
func Delete(ws, deploymentID, deploymentName, name string, force bool, astroV1Client astrov1.APIClient, out io.Writer) error { //nolint:gocognit
12✔
489
        var (
12✔
490
                requestedDeployment      astrov1.Deployment
12✔
491
                err                      error
12✔
492
                queueToDelete            astrov1.WorkerQueueRequest
12✔
493
                queueToDeleteHybrid      astrov1.HybridWorkerQueueRequest
12✔
494
                existingQueues           []astrov1.WorkerQueue
12✔
495
                workerQueuesToKeep       []astrov1.WorkerQueueRequest
12✔
496
                hybridWorkerQueuesToKeep []astrov1.HybridWorkerQueueRequest
12✔
497
        )
12✔
498
        // get or select the deployment
12✔
499
        requestedDeployment, err = deployment.GetDeployment(ws, deploymentID, deploymentName, true, nil, astroV1Client)
12✔
500
        if err != nil {
13✔
501
                return err
1✔
502
        }
1✔
503

504
        if requestedDeployment.Id == "" {
12✔
505
                fmt.Printf("%s %s\n", deployment.NoDeploymentInWSMsg, ansi.Bold(ws))
1✔
506
                return nil
1✔
507
        }
1✔
508

509
        // prompt for queue name if one was not provided
510
        if name == "" {
12✔
511
                name, err = selectQueue(requestedDeployment.WorkerQueues, out)
2✔
512
                if err != nil {
3✔
513
                        return err
1✔
514
                }
1✔
515
        }
516
        // check if default queue is being deleted
517
        if name == defaultQueueName {
10✔
518
                return errCannotDeleteDefaultQueue
1✔
519
        }
1✔
520
        queueToDelete = astrov1.WorkerQueueRequest{
8✔
521
                Name:      name,
8✔
522
                IsDefault: false, // cannot delete a default queue
8✔
523
        }
8✔
524
        queueToDeleteHybrid = astrov1.HybridWorkerQueueRequest{
8✔
525
                Name:      name,
8✔
526
                IsDefault: false, // cannot delete a default queue
8✔
527
        }
8✔
528

8✔
529
        // sanitize all the existing queues based on executor
8✔
530
        existingQueues = sanitizeExistingQueues(*requestedDeployment.WorkerQueues, *requestedDeployment.Executor)
8✔
531

8✔
532
        if QueueExists(existingQueues, queueToDelete, queueToDeleteHybrid) {
15✔
533
                if !force {
9✔
534
                        i, _ := input.Confirm(
2✔
535
                                fmt.Sprintf("\nAre you sure you want to delete the %s worker queue? If there are any tasks in your DAGs assigned to this worker queue, the tasks might get stuck in a queued state and fail to execute", ansi.Bold(queueToDelete.Name)))
2✔
536

2✔
537
                        if !i {
3✔
538
                                fmt.Fprintf(out, "Canceling worker queue deletion\n")
1✔
539
                                return nil
1✔
540
                        }
1✔
541
                }
542
                if deployment.IsDeploymentStandard(*requestedDeployment.Type) || deployment.IsDeploymentDedicated(*requestedDeployment.Type) {
8✔
543
                        // create a new workerQueuesToKeep without queueToDelete in it
2✔
544
                        for i := range existingQueues { //nolint
6✔
545
                                if existingQueues[i].Name != queueToDelete.Name {
6✔
546
                                        existingQueueRequest := astrov1.WorkerQueueRequest{
2✔
547
                                                Name:              existingQueues[i].Name,
2✔
548
                                                Id:                &existingQueues[i].Id,
2✔
549
                                                IsDefault:         existingQueues[i].IsDefault,
2✔
550
                                                MaxWorkerCount:    existingQueues[i].MaxWorkerCount,
2✔
551
                                                MinWorkerCount:    existingQueues[i].MinWorkerCount,
2✔
552
                                                WorkerConcurrency: existingQueues[i].WorkerConcurrency,
2✔
553
                                                AstroMachine:      astrov1.WorkerQueueRequestAstroMachine(*existingQueues[i].AstroMachine),
2✔
554
                                        }
2✔
555
                                        workerQueuesToKeep = append(workerQueuesToKeep, existingQueueRequest)
2✔
556
                                }
2✔
557
                        }
558
                        // update the deployment with the new list
559
                        err = deployment.Update(requestedDeployment.Id, "", ws, "", "", "", "", "", "", "", "", "", "", "", "", "", 0, 0, workerQueuesToKeep, hybridWorkerQueuesToKeep, []astrov1.DeploymentEnvironmentVariableRequest{}, nil, nil, nil, true, astroV1Client)
2✔
560
                        if err != nil {
2✔
561
                                return err
×
562
                        }
×
563
                        fmt.Fprintf(out, "worker queue %s for %s in %s workspace deleted\n", queueToDelete.Name, requestedDeployment.Name, ws)
2✔
564
                } else {
4✔
565
                        // create a new listToDeleteHybrid without queueToDeleteHybrid in it
4✔
566
                        for i := range existingQueues { //nolint
12✔
567
                                if existingQueues[i].Name != queueToDeleteHybrid.Name {
12✔
568
                                        existingQueueRequest := astrov1.HybridWorkerQueueRequest{
4✔
569
                                                Name:              existingQueues[i].Name,
4✔
570
                                                Id:                &existingQueues[i].Id,
4✔
571
                                                IsDefault:         existingQueues[i].IsDefault,
4✔
572
                                                MaxWorkerCount:    existingQueues[i].MaxWorkerCount,
4✔
573
                                                MinWorkerCount:    existingQueues[i].MinWorkerCount,
4✔
574
                                                WorkerConcurrency: existingQueues[i].WorkerConcurrency,
4✔
575
                                                NodePoolId:        *existingQueues[i].NodePoolId,
4✔
576
                                        }
4✔
577
                                        hybridWorkerQueuesToKeep = append(hybridWorkerQueuesToKeep, existingQueueRequest)
4✔
578
                                }
4✔
579
                        }
580
                        // update the deployment with the new list
581
                        err = deployment.Update(requestedDeployment.Id, "", ws, "", "", "", "", "", "", "", "", "", "", "", "", "", 0, 0, workerQueuesToKeep, hybridWorkerQueuesToKeep, []astrov1.DeploymentEnvironmentVariableRequest{}, nil, nil, nil, true, astroV1Client)
4✔
582
                        if err != nil {
5✔
583
                                return err
1✔
584
                        }
1✔
585
                        fmt.Fprintf(out, "worker queue %s for %s in %s workspace deleted\n", queueToDelete.Name, requestedDeployment.Name, ws)
3✔
586
                }
587
                return nil
5✔
588
        }
589
        // can not delete a queue that does not exist
590
        return fmt.Errorf("%w: %s", errQueueDoesNotExist, queueToDelete.Name)
1✔
591
}
592

593
// selectQueue takes []WorkerQueue and io.Writer as arguments
594
// user can select a queue to delete from the list and the name of the selected queue is returned
595
// An errInvalidQueue is returned if a user chooses a queue not on the list
596
func selectQueue(queueListIndex *[]astrov1.WorkerQueue, out io.Writer) (string, error) {
9✔
597
        var (
9✔
598
                errToReturn        error
9✔
599
                queueName, message string
9✔
600
                queueToDelete      astrov1.WorkerQueue
9✔
601
                queueList          []astrov1.WorkerQueue
9✔
602
        )
9✔
603
        if queueListIndex != nil {
17✔
604
                queueList = *queueListIndex
8✔
605
        } else {
9✔
606
                return "", errNoWorkerQueues
1✔
607
        }
1✔
608

609
        tab := printutil.Table{
8✔
610
                Padding:        []int{5, 30, 20, 50},
8✔
611
                DynamicPadding: true,
8✔
612
                Header:         []string{"#", "WORKER QUEUE", "ISDEFAULT", "ID"},
8✔
613
        }
8✔
614

8✔
615
        fmt.Println(message)
8✔
616

8✔
617
        sort.Slice(queueList, func(i, j int) bool {
18✔
618
                return queueList[i].Name < queueList[j].Name
10✔
619
        })
10✔
620

621
        queueMap := map[string]astrov1.WorkerQueue{}
8✔
622
        for i := range queueList {
26✔
623
                index := i + 1
18✔
624
                tab.AddRow([]string{strconv.Itoa(index), queueList[i].Name, strconv.FormatBool(queueList[i].IsDefault), queueList[i].Id}, false)
18✔
625

18✔
626
                queueMap[strconv.Itoa(index)] = queueList[i]
18✔
627
        }
18✔
628

629
        tab.Print(out)
8✔
630
        choice := input.Text("\n> ")
8✔
631
        queueToDelete, ok := queueMap[choice]
8✔
632
        if !ok {
12✔
633
                // returning an error as choice was not in queueMap
4✔
634
                errToReturn = fmt.Errorf("%w: invalid worker queue: %s selected", errInvalidQueue, choice)
4✔
635
                return queueName, errToReturn
4✔
636
        }
4✔
637
        return queueToDelete.Name, nil
4✔
638
}
639

640
// updateQueueList is used to merge existingQueues with the queueToUpdate. Based on the executor for the deployment, it
641
// sets the resources for CeleryExecutor and AstroExecutor and removes all resources for KubernetesExecutor as they get calculated based
642
// on the worker type.
643
//
644
//nolint:dupl
645
func updateQueueList(existingQueues []astrov1.WorkerQueueRequest, queueToUpdate astrov1.WorkerQueueRequest, executor *astrov1.DeploymentExecutor, wQueueMin, wQueueMax, wQueueConcurrency int) []astrov1.WorkerQueueRequest {
12✔
646
        for i, queue := range existingQueues { //nolint
28✔
647
                if queue.Name != queueToUpdate.Name {
26✔
648
                        continue
10✔
649
                }
650

651
                queue.Id = existingQueues[i].Id               // we need IDs to update existing queues
6✔
652
                queue.IsDefault = existingQueues[i].IsDefault // users can not change this
6✔
653
                switch *executor {
6✔
654
                case astrov1.DeploymentExecutorCELERY, astrov1.DeploymentExecutorASTRO:
6✔
655
                        if wQueueMin != -1 {
10✔
656
                                queue.MinWorkerCount = queueToUpdate.MinWorkerCount
4✔
657
                        }
4✔
658
                        if wQueueMax != 0 {
10✔
659
                                queue.MaxWorkerCount = queueToUpdate.MaxWorkerCount
4✔
660
                        }
4✔
661
                        if wQueueConcurrency != 0 {
10✔
662
                                queue.WorkerConcurrency = queueToUpdate.WorkerConcurrency
4✔
663
                        }
4✔
NEW
664
                case astrov1.DeploymentExecutorKUBERNETES:
×
665
                        // KubernetesExecutor calculates resources automatically based on the worker type
×
666
                        queue.WorkerConcurrency = 0
×
667
                        queue.MinWorkerCount = 0
×
668
                        queue.MaxWorkerCount = 0
×
669
                }
670
                queue.AstroMachine = queueToUpdate.AstroMachine
6✔
671
                existingQueues[i] = queue
6✔
672
                return existingQueues
6✔
673
        }
674
        return existingQueues
6✔
675
}
676

677
//nolint:dupl
678
func updateHybridQueueList(existingQueues []astrov1.HybridWorkerQueueRequest, queueToUpdate astrov1.HybridWorkerQueueRequest, executor *astrov1.DeploymentExecutor, wQueueMin, wQueueMax, wQueueConcurrency int) []astrov1.HybridWorkerQueueRequest {
4✔
679
        for i, queue := range existingQueues { //nolint
8✔
680
                if queue.Name != queueToUpdate.Name {
4✔
681
                        continue
×
682
                }
683

684
                queue.Id = existingQueues[i].Id               // we need IDs to update existing queues
4✔
685
                queue.IsDefault = existingQueues[i].IsDefault // users can not change this
4✔
686
                if *executor == astrov1.DeploymentExecutorCELERY {
6✔
687
                        if wQueueMin != -1 {
2✔
688
                                queue.MinWorkerCount = queueToUpdate.MinWorkerCount
×
689
                        }
×
690
                        if wQueueMax != 0 {
4✔
691
                                queue.MaxWorkerCount = queueToUpdate.MaxWorkerCount
2✔
692
                        }
2✔
693
                        if wQueueConcurrency != 0 {
4✔
694
                                queue.WorkerConcurrency = queueToUpdate.WorkerConcurrency
2✔
695
                        }
2✔
696
                } else if *executor == astrov1.DeploymentExecutorKUBERNETES {
4✔
697
                        // KubernetesExecutor calculates resources automatically based on the worker type
2✔
698
                        queue.WorkerConcurrency = 0
2✔
699
                        queue.MinWorkerCount = 0
2✔
700
                        queue.MaxWorkerCount = 0
2✔
701
                }
2✔
702
                queue.NodePoolId = queueToUpdate.NodePoolId
4✔
703
                existingQueues[i] = queue
4✔
704
                return existingQueues
4✔
705
        }
706
        return existingQueues
×
707
}
708

709
// getQueueName returns the name for a worker-queue. If action is to create, it prompts the user for a name to use.
710
// If action is to update, it makes the user select a queue from a list of existing ones.
711
// It returns errInvalidQueue if a user chooses a queue not on the list
712
func getQueueName(name, action string, requestedDeployment *astrov1.Deployment, out io.Writer) (string, error) {
12✔
713
        var (
12✔
714
                queueName string
12✔
715
                err       error
12✔
716
        )
12✔
717
        if name == "" {
24✔
718
                switch action {
12✔
719
                case createAction:
3✔
720
                        // prompt for name if one was not provided
3✔
721
                        queueName = input.Text("Enter a name for the worker queue\n> ")
3✔
722
                case updateAction:
4✔
723
                        // user selects a queue as no name was provided
4✔
724
                        queueName, err = selectQueue(requestedDeployment.WorkerQueues, out)
4✔
725
                        if err != nil {
6✔
726
                                return "", err
2✔
727
                        }
2✔
728
                }
729
        }
730
        return queueName, nil
10✔
731
}
732

733
// sanitizeExistingQueues takes a list of existing worker queues and removes fields that are not needed for queues based
734
// on the executor. For deployments with CeleryExecutor it returns a list of queues without PodCPU and PodRam.  For
735
// deployments with KubernetesExecutor it returns a list of queues with no resources as they get calculated
736
// based on the worker type.
737
func sanitizeExistingQueues(existingQueues []astrov1.WorkerQueue, executor astrov1.DeploymentExecutor) []astrov1.WorkerQueue {
28✔
738
        // sort queues by name
28✔
739
        sort.Slice(existingQueues, func(i, j int) bool {
39✔
740
                return existingQueues[i].Name < existingQueues[j].Name
11✔
741
        })
11✔
742
        for i := range existingQueues {
64✔
743
                if executor == astrov1.DeploymentExecutorCELERY {
66✔
744
                        existingQueues[i].PodMemory = ""
30✔
745
                        existingQueues[i].PodCpu = ""
30✔
746
                } else if executor == astrov1.DeploymentExecutorKUBERNETES {
39✔
747
                        // KubernetesExecutor calculates resources automatically based on the worker type
3✔
748
                        existingQueues[i].WorkerConcurrency = 0
3✔
749
                        existingQueues[i].MinWorkerCount = 0
3✔
750
                        existingQueues[i].MaxWorkerCount = 0
3✔
751
                        existingQueues[i].PodMemory = ""
3✔
752
                        existingQueues[i].PodCpu = ""
3✔
753
                }
3✔
754
        }
755
        return existingQueues
28✔
756
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc