• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

astronomer / astro-cli / 81faee59-a52f-4df7-ba5a-9ff72ec0e630

27 Oct 2025 01:40PM UTC coverage: 38.571% (+0.05%) from 38.524%
81faee59-a52f-4df7-ba5a-9ff72ec0e630

push

circleci

web-flow
Mark 1.38.0 as latest in godownloader (#1971)

24198 of 62736 relevant lines covered (38.57%)

10.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.23
/cloud/deploy/deploy.go
1
package deploy
2

3
import (
4
        "bufio"
5
        "bytes"
6
        httpContext "context"
7
        "fmt"
8
        "os"
9
        "path/filepath"
10
        "strings"
11
        "time"
12

13
        "github.com/astronomer/astro-cli/airflow"
14
        "github.com/astronomer/astro-cli/airflow/types"
15
        airflowversions "github.com/astronomer/astro-cli/airflow_versions"
16
        astrocore "github.com/astronomer/astro-cli/astro-client-core"
17
        astroplatformcore "github.com/astronomer/astro-cli/astro-client-platform-core"
18
        "github.com/astronomer/astro-cli/cloud/deployment"
19
        "github.com/astronomer/astro-cli/cloud/organization"
20
        "github.com/astronomer/astro-cli/config"
21
        "github.com/astronomer/astro-cli/docker"
22
        "github.com/astronomer/astro-cli/pkg/ansi"
23
        "github.com/astronomer/astro-cli/pkg/azure"
24
        "github.com/astronomer/astro-cli/pkg/fileutil"
25
        "github.com/astronomer/astro-cli/pkg/httputil"
26
        "github.com/astronomer/astro-cli/pkg/input"
27
        "github.com/astronomer/astro-cli/pkg/logger"
28
        "github.com/astronomer/astro-cli/pkg/util"
29
        "github.com/pkg/errors"
30
)
31

32
const (
33
        parse                  = "parse"
34
        astroDomain            = "astronomer.io"
35
        registryUsername       = "cli"
36
        runtimeImageLabel      = airflow.RuntimeImageLabel
37
        dagParseAllowedVersion = "4.1.0"
38

39
        composeImageBuildingPromptMsg     = "Building image..."
40
        composeSkipImageBuildingPromptMsg = "Skipping building image..."
41
        deploymentHeaderMsg               = "Authenticated to %s \n\n"
42

43
        warningInvalidImageNameMsg = "WARNING! The image in your Dockerfile '%s' is not based on Astro Runtime and is not supported. Change your Dockerfile with an image that pulls from 'quay.io/astronomer/astro-runtime' to proceed.\n"
44

45
        allTests                 = "all-tests"
46
        parseAndPytest           = "parse-and-all-tests"
47
        enableDagDeployMsg       = "DAG-only deploys are not enabled for this Deployment. Run 'astro deployment update %s --dag-deploy enable' to enable DAG-only deploys"
48
        dagDeployDisabled        = "dag deploy is not enabled for deployment"
49
        invalidWorkspaceID       = "Invalid workspace id %s was provided through the --workspace-id flag\n"
50
        errCiCdEnforcementUpdate = "cannot deploy since ci/cd enforcement is enabled for the deployment %s. Please use API Tokens instead"
51
)
52

53
var (
54
        pytestFile string
55
        dockerfile = "Dockerfile"
56

57
        deployImagePlatformSupport = []string{"linux/amd64"}
58

59
        // Monkey patched to write unit tests
60
        airflowImageHandler  = airflow.ImageHandlerInit
61
        containerHandlerInit = airflow.ContainerHandlerInit
62
        azureUploader        = azure.Upload
63
        canCiCdDeploy        = deployment.CanCiCdDeploy
64
        dagTarballVersion    = ""
65
        dagsUploadURL        = ""
66
        nextTag              = ""
67
)
68

69
var (
70
        errDagsParseFailed = errors.New("your local DAGs did not parse. Fix the listed errors or use `astro deploy [deployment-id] -f` to force deploy") //nolint:revive
71
        envFileMissing     = errors.New("Env file path is incorrect: ")                                                                                  //nolint:revive
72
)
73

74
var (
75
        sleepTime              = 90
76
        dagOnlyDeploySleepTime = 30
77
        tickNum                = 10
78
)
79

80
type deploymentInfo struct {
81
        deploymentID             string
82
        namespace                string
83
        deployImage              string
84
        currentVersion           string
85
        organizationID           string
86
        workspaceID              string
87
        webserverURL             string
88
        deploymentType           string
89
        desiredDagTarballVersion string
90
        dagDeployEnabled         bool
91
        cicdEnforcement          bool
92
        name                     string
93
        isRemoteExecutionEnabled bool
94
}
95

96
type InputDeploy struct {
97
        Path              string
98
        RuntimeID         string
99
        WsID              string
100
        Pytest            string
101
        EnvFile           string
102
        ImageName         string
103
        DeploymentName    string
104
        Prompt            bool
105
        Dags              bool
106
        Image             bool
107
        WaitForStatus     bool
108
        WaitTime          time.Duration
109
        DagsPath          string
110
        Description       string
111
        BuildSecretString string
112
}
113

114
// InputClientDeploy contains inputs for client image deployments
115
type InputClientDeploy struct {
116
        Path              string
117
        ImageName         string
118
        Platform          string
119
        BuildSecretString string
120
}
121

122
const accessYourDeploymentFmt = `
123

124
 Access your Deployment:
125

126
 Deployment View: %s
127
 Airflow UI: %s
128
`
129

130
func removeDagsFromDockerIgnore(fullpath string) error {
58✔
131
        f, err := os.Open(fullpath)
58✔
132
        if err != nil {
58✔
133
                return err
×
134
        }
×
135

136
        defer f.Close()
58✔
137

58✔
138
        var bs []byte
58✔
139
        buf := bytes.NewBuffer(bs)
58✔
140

58✔
141
        scanner := bufio.NewScanner(f)
58✔
142
        for scanner.Scan() {
96✔
143
                text := scanner.Text()
38✔
144
                if text != "dags/" {
57✔
145
                        _, err = buf.WriteString(text + "\n")
19✔
146
                        if err != nil {
19✔
147
                                return err
×
148
                        }
×
149
                }
150
        }
151

152
        if err := scanner.Err(); err != nil {
58✔
153
                return err
×
154
        }
×
155
        err = os.WriteFile(fullpath, bytes.Trim(buf.Bytes(), "\n"), 0o666) //nolint:gosec, mnd
58✔
156
        if err != nil {
58✔
157
                return err
×
158
        }
×
159

160
        return nil
58✔
161
}
162

163
func shouldIncludeMonitoringDag(deploymentType astroplatformcore.DeploymentType) bool {
20✔
164
        return !organization.IsOrgHosted() && !deployment.IsDeploymentDedicated(deploymentType) && !deployment.IsDeploymentStandard(deploymentType)
20✔
165
}
20✔
166

167
func deployDags(path, dagsPath, dagsUploadURL, currentRuntimeVersion string, deploymentType astroplatformcore.DeploymentType) (string, error) {
20✔
168
        if shouldIncludeMonitoringDag(deploymentType) {
36✔
169
                monitoringDagPath := filepath.Join(dagsPath, "astronomer_monitoring_dag.py")
16✔
170

16✔
171
                // Create monitoring dag file
16✔
172
                err := fileutil.WriteStringToFile(monitoringDagPath, airflow.Af2MonitoringDag)
16✔
173
                if err != nil {
16✔
174
                        return "", err
×
175
                }
×
176

177
                // Remove the monitoring dag file after the upload
178
                defer os.Remove(monitoringDagPath)
16✔
179
        }
180

181
        versionID, err := UploadBundle(path, dagsPath, dagsUploadURL, true, currentRuntimeVersion)
20✔
182
        if err != nil {
20✔
183
                return "", err
×
184
        }
×
185

186
        return versionID, nil
20✔
187
}
188

189
// Deploy pushes a new docker image
190
func Deploy(deployInput InputDeploy, platformCoreClient astroplatformcore.CoreClient, coreClient astrocore.CoreClient) error { //nolint
41✔
191
        c, err := config.GetCurrentContext()
41✔
192
        if err != nil {
42✔
193
                return err
1✔
194
        }
1✔
195

196
        if c.Domain == astroDomain {
43✔
197
                fmt.Printf(deploymentHeaderMsg, "Astro")
3✔
198
        } else {
40✔
199
                fmt.Printf(deploymentHeaderMsg, c.Domain)
37✔
200
        }
37✔
201

202
        deployInfo, err := getDeploymentInfo(deployInput.RuntimeID, deployInput.WsID, deployInput.DeploymentName, deployInput.Prompt, platformCoreClient, coreClient)
40✔
203
        if err != nil {
40✔
204
                return err
×
205
        }
×
206

207
        var dagsPath string
40✔
208
        if deployInput.DagsPath != "" {
55✔
209
                dagsPath = deployInput.DagsPath
15✔
210
        } else {
40✔
211
                dagsPath = filepath.Join(deployInput.Path, "dags")
25✔
212
        }
25✔
213

214
        var dagFiles []string
40✔
215
        if !deployInfo.isRemoteExecutionEnabled {
75✔
216
                dagFiles = fileutil.GetFilesWithSpecificExtension(dagsPath, ".py")
35✔
217
        }
35✔
218

219
        if deployInfo.cicdEnforcement {
41✔
220
                if !canCiCdDeploy(c.Token) {
2✔
221
                        return fmt.Errorf(errCiCdEnforcementUpdate, deployInfo.name) //nolint
1✔
222
                }
1✔
223
        }
224

225
        if deployInput.WsID != deployInfo.workspaceID {
40✔
226
                fmt.Printf(invalidWorkspaceID, deployInput.WsID)
1✔
227
                return nil
1✔
228
        }
1✔
229

230
        if deployInput.Image {
41✔
231
                if !deployInfo.dagDeployEnabled {
3✔
232
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
233
                }
×
234
        }
235

236
        deploymentURL, err := deployment.GetDeploymentURL(deployInfo.deploymentID, deployInfo.workspaceID)
38✔
237
        if err != nil {
38✔
238
                return err
×
239
        }
×
240
        createDeployRequest := astroplatformcore.CreateDeployRequest{
38✔
241
                Description: &deployInput.Description,
38✔
242
        }
38✔
243
        switch {
38✔
244
        case deployInput.Dags:
18✔
245
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeDAGONLY
18✔
246
        case deployInput.Image:
1✔
247
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEONLY
1✔
248
        default:
19✔
249
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEANDDAG
19✔
250
        }
251
        deploy, err := createDeploy(deployInfo.organizationID, deployInfo.deploymentID, createDeployRequest, platformCoreClient)
38✔
252
        if err != nil {
38✔
253
                return err
×
254
        }
×
255
        deployID := deploy.Id
38✔
256
        if deploy.DagsUploadUrl != nil {
76✔
257
                dagsUploadURL = *deploy.DagsUploadUrl
38✔
258
        } else {
38✔
259
                dagsUploadURL = ""
×
260
        }
×
261
        if deploy.ImageTag != "" {
38✔
262
                nextTag = deploy.ImageTag
×
263
        } else {
38✔
264
                nextTag = ""
38✔
265
        }
38✔
266

267
        if deployInput.Dags {
56✔
268
                if len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() {
19✔
269
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
1✔
270

1✔
271
                        if !i {
2✔
272
                                fmt.Println("Canceling deploy...")
1✔
273
                                return nil
1✔
274
                        }
1✔
275
                }
276
                if deployInput.Pytest != "" {
29✔
277
                        runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, platformCoreClient)
12✔
278
                        if err != nil {
12✔
279
                                return err
×
280
                        }
×
281

282
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
12✔
283
                        if err != nil {
14✔
284
                                return err
2✔
285
                        }
2✔
286
                }
287

288
                if !deployInfo.dagDeployEnabled {
16✔
289
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
1✔
290
                }
1✔
291

292
                fmt.Println("Initiating DAG deploy for: " + deployInfo.deploymentID)
14✔
293
                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
14✔
294
                if err != nil {
14✔
295
                        if strings.Contains(err.Error(), dagDeployDisabled) {
×
296
                                return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
297
                        }
×
298

299
                        return err
×
300
                }
301

302
                // finish deploy
303
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
14✔
304
                if err != nil {
14✔
305
                        return err
×
306
                }
×
307

308
                if deployInput.WaitForStatus {
15✔
309
                        // Keeping wait timeout low since dag only deploy is faster
1✔
310
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, dagOnlyDeploySleepTime, tickNum, int(deployInput.WaitTime.Seconds()), platformCoreClient)
1✔
311
                        if err != nil {
2✔
312
                                return err
1✔
313
                        }
1✔
314

315
                        fmt.Println(
×
316
                                "\nSuccessfully uploaded DAGs with version " + ansi.Bold(dagTarballVersion) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful." +
×
317
                                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold(deploymentURL), ansi.Bold(deployInfo.webserverURL)),
×
318
                        )
×
319

×
320
                        return nil
×
321
                }
322

323
                fmt.Println(
13✔
324
                        "\nSuccessfully uploaded DAGs with version " + ansi.Bold(
13✔
325
                                dagTarballVersion,
13✔
326
                        ) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful. The Airflow UI takes about 1 minute to update." +
13✔
327
                                fmt.Sprintf(
13✔
328
                                        accessYourDeploymentFmt,
13✔
329
                                        ansi.Bold(deploymentURL),
13✔
330
                                        ansi.Bold(deployInfo.webserverURL),
13✔
331
                                ),
13✔
332
                )
13✔
333
        } else {
20✔
334
                fullpath := filepath.Join(deployInput.Path, ".dockerignore")
20✔
335
                fileExist, _ := fileutil.Exists(fullpath, nil)
20✔
336
                if fileExist {
40✔
337
                        err := removeDagsFromDockerIgnore(fullpath)
20✔
338
                        if err != nil {
20✔
339
                                return errors.Wrap(err, "Found dags entry in .dockerignore file. Remove this entry and try again")
×
340
                        }
×
341
                }
342
                envFileExists, _ := fileutil.Exists(deployInput.EnvFile, nil)
20✔
343
                if !envFileExists && deployInput.EnvFile != ".env" {
21✔
344
                        return fmt.Errorf("%w %s", envFileMissing, deployInput.EnvFile)
1✔
345
                }
1✔
346

347
                if deployInfo.dagDeployEnabled && len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() && !deployInput.Image {
19✔
348
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
×
349

×
350
                        if !i {
×
351
                                fmt.Println("Canceling deploy...")
×
352
                                return nil
×
353
                        }
×
354
                }
355

356
                // Build our image
357
                runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, platformCoreClient)
19✔
358
                if err != nil {
19✔
359
                        return err
×
360
                }
×
361

362
                if len(dagFiles) > 0 {
26✔
363
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
7✔
364
                        if err != nil {
8✔
365
                                return err
1✔
366
                        }
1✔
367
                } else {
12✔
368
                        fmt.Println("No DAGs found. Skipping testing...")
12✔
369
                }
12✔
370

371
                repository := deploy.ImageRepository
18✔
372
                // TODO: Resolve the edge case where two people push the same nextTag at the same time
18✔
373
                remoteImage := fmt.Sprintf("%s:%s", repository, nextTag)
18✔
374

18✔
375
                imageHandler := airflowImageHandler(deployInfo.deployImage)
18✔
376
                fmt.Println("Pushing image to Astronomer registry")
18✔
377
                _, err = imageHandler.Push(remoteImage, registryUsername, c.Token, false)
18✔
378
                if err != nil {
18✔
379
                        return err
×
380
                }
×
381

382
                if deployInfo.dagDeployEnabled && len(dagFiles) > 0 {
24✔
383
                        if !deployInput.Image {
12✔
384
                                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
6✔
385
                                if err != nil {
6✔
386
                                        return err
×
387
                                }
×
388
                        } else {
×
389
                                fmt.Println("Image Deploy only. Skipping deploying DAG...")
×
390
                        }
×
391
                }
392
                // finish deploy
393
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
18✔
394
                if err != nil {
18✔
395
                        return err
×
396
                }
×
397

398
                if deployInput.WaitForStatus {
20✔
399
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, sleepTime, tickNum, int(deployInput.WaitTime.Seconds()), platformCoreClient)
2✔
400
                        if err != nil {
4✔
401
                                return err
2✔
402
                        }
2✔
403
                }
404

405
                fmt.Println("Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags." +
16✔
406
                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold("https://"+deploymentURL), ansi.Bold("https://"+deployInfo.webserverURL)))
16✔
407
        }
408

409
        return nil
29✔
410
}
411

412
func getDeploymentInfo(
413
        deploymentID, wsID, deploymentName string,
414
        prompt bool,
415
        platformCoreClient astroplatformcore.CoreClient,
416
        coreClient astrocore.CoreClient,
417
) (deploymentInfo, error) {
40✔
418
        // Use config deployment if provided
40✔
419
        if deploymentID == "" {
54✔
420
                deploymentID = config.CFG.ProjectDeployment.GetProjectString()
14✔
421
                if deploymentID != "" {
14✔
422
                        fmt.Printf("Deployment ID found in the config file. This Deployment ID will be used for the deploy\n")
×
423
                }
×
424
        }
425

426
        if deploymentID != "" && deploymentName != "" {
48✔
427
                fmt.Printf("Both a Deployment ID and Deployment name have been supplied. The Deployment ID %s will be used for the Deploy\n", deploymentID)
8✔
428
        }
8✔
429

430
        // check if deploymentID or if force prompt was requested was given by user
431
        if deploymentID == "" || prompt {
67✔
432
                currentDeployment, err := deployment.GetDeployment(wsID, deploymentID, deploymentName, false, nil, platformCoreClient, coreClient)
27✔
433
                if err != nil {
27✔
434
                        return deploymentInfo{}, err
×
435
                }
×
436
                coreDeployment, err := deployment.CoreGetDeployment(currentDeployment.OrganizationId, currentDeployment.Id, platformCoreClient)
27✔
437
                if err != nil {
27✔
438
                        return deploymentInfo{}, err
×
439
                }
×
440
                var desiredDagTarballVersion string
27✔
441
                if coreDeployment.DesiredDagTarballVersion != nil {
45✔
442
                        desiredDagTarballVersion = *coreDeployment.DesiredDagTarballVersion
18✔
443
                } else {
27✔
444
                        desiredDagTarballVersion = ""
9✔
445
                }
9✔
446

447
                return deploymentInfo{
27✔
448
                        currentDeployment.Id,
27✔
449
                        currentDeployment.Namespace,
27✔
450
                        airflow.ImageName(currentDeployment.Namespace, "latest"),
27✔
451
                        currentDeployment.RuntimeVersion,
27✔
452
                        currentDeployment.OrganizationId,
27✔
453
                        currentDeployment.WorkspaceId,
27✔
454
                        currentDeployment.WebServerUrl,
27✔
455
                        string(*currentDeployment.Type),
27✔
456
                        desiredDagTarballVersion,
27✔
457
                        currentDeployment.IsDagDeployEnabled,
27✔
458
                        currentDeployment.IsCicdEnforced,
27✔
459
                        currentDeployment.Name,
27✔
460
                        deployment.IsRemoteExecutionEnabled(&currentDeployment),
27✔
461
                }, nil
27✔
462
        }
463
        c, err := config.GetCurrentContext()
13✔
464
        if err != nil {
13✔
465
                return deploymentInfo{}, err
×
466
        }
×
467
        deployInfo, err := getImageName(deploymentID, c.Organization, platformCoreClient)
13✔
468
        if err != nil {
13✔
469
                return deploymentInfo{}, err
×
470
        }
×
471
        deployInfo.deploymentID = deploymentID
13✔
472
        return deployInfo, nil
13✔
473
}
474

475
func parseOrPytestDAG(pytest, runtimeVersion, envFile, deployImage, namespace, buildSecretString string) error {
19✔
476
        validDAGParseVersion := airflowversions.CompareRuntimeVersions(runtimeVersion, dagParseAllowedVersion) >= 0
19✔
477
        if !validDAGParseVersion {
19✔
478
                fmt.Println("\nruntime image is earlier than 4.1.0, this deploy will skip DAG parse...")
×
479
        }
×
480

481
        containerHandler, err := containerHandlerInit(config.WorkingPath, envFile, "Dockerfile", namespace)
19✔
482
        if err != nil {
19✔
483
                return err
×
484
        }
×
485

486
        switch {
19✔
487
        case pytest == parse && validDAGParseVersion:
7✔
488
                // parse dags
7✔
489
                fmt.Println("Testing image...")
7✔
490
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
7✔
491
                if err != nil {
9✔
492
                        return err
2✔
493
                }
2✔
494
        case pytest != "" && pytest != parse && pytest != parseAndPytest:
6✔
495
                // check pytests
6✔
496
                fmt.Println("Testing image...")
6✔
497
                err := checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
498
                if err != nil {
7✔
499
                        return err
1✔
500
                }
1✔
501
        case pytest == parseAndPytest:
6✔
502
                // parse dags and check pytests
6✔
503
                fmt.Println("Testing image...")
6✔
504
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
6✔
505
                if err != nil {
6✔
506
                        return err
×
507
                }
×
508

509
                err = checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
510
                if err != nil {
6✔
511
                        return err
×
512
                }
×
513
        }
514
        return nil
16✔
515
}
516

517
func parseDAGs(deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
13✔
518
        if !config.CFG.SkipParse.GetBool() && !util.CheckEnvBool(os.Getenv("ASTRONOMER_SKIP_PARSE")) {
26✔
519
                err := containerHandler.Parse("", deployImage, buildSecretString)
13✔
520
                if err != nil {
15✔
521
                        fmt.Println(err)
2✔
522
                        return errDagsParseFailed
2✔
523
                }
2✔
524
        } else {
×
525
                fmt.Println("Skipping parsing dags due to skip parse being set to true in either the config.yaml or local environment variables")
×
526
        }
×
527

528
        return nil
11✔
529
}
530

531
// Validate code with pytest
532
func checkPytest(pytest, deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
14✔
533
        if pytest != allTests && pytest != parseAndPytest {
18✔
534
                pytestFile = pytest
4✔
535
        }
4✔
536

537
        exitCode, err := containerHandler.Pytest(pytestFile, "", deployImage, "", buildSecretString)
14✔
538
        if err != nil {
17✔
539
                if strings.Contains(exitCode, "1") { // exit code is 1 meaning tests failed
4✔
540
                        return errors.New("at least 1 pytest in your tests directory failed. Fix the issues listed or rerun the command without the '--pytest' flag to deploy")
1✔
541
                }
1✔
542
                return errors.Wrap(err, "Something went wrong while Pytesting your DAGs,\nif the issue persists rerun the command without the '--pytest' flag to deploy")
2✔
543
        }
544

545
        fmt.Print("\nAll Pytests passed!\n")
11✔
546
        return err
11✔
547
}
548

549
func getImageName(deploymentID, organizationID string, platformCoreClient astroplatformcore.CoreClient) (deploymentInfo, error) {
13✔
550
        resp, err := platformCoreClient.GetDeploymentWithResponse(httpContext.Background(), organizationID, deploymentID)
13✔
551
        if err != nil {
13✔
552
                return deploymentInfo{}, err
×
553
        }
×
554

555
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
13✔
556
        if err != nil {
13✔
557
                return deploymentInfo{}, err
×
558
        }
×
559

560
        currentVersion := resp.JSON200.RuntimeVersion
13✔
561
        namespace := resp.JSON200.Namespace
13✔
562
        workspaceID := resp.JSON200.WorkspaceId
13✔
563
        webserverURL := resp.JSON200.WebServerUrl
13✔
564
        dagDeployEnabled := resp.JSON200.IsDagDeployEnabled
13✔
565
        cicdEnforcement := resp.JSON200.IsCicdEnforced
13✔
566
        isRemoteExecutionEnabled := deployment.IsRemoteExecutionEnabled(resp.JSON200)
13✔
567
        var desiredDagTarballVersion string
13✔
568
        if resp.JSON200.DesiredDagTarballVersion != nil {
18✔
569
                desiredDagTarballVersion = *resp.JSON200.DesiredDagTarballVersion
5✔
570
        } else {
13✔
571
                desiredDagTarballVersion = ""
8✔
572
        }
8✔
573

574
        // We use latest and keep this tag around after deployments to keep subsequent deploys quick
575
        deployImage := airflow.ImageName(namespace, "latest")
13✔
576

13✔
577
        return deploymentInfo{
13✔
578
                namespace:                namespace,
13✔
579
                deployImage:              deployImage,
13✔
580
                currentVersion:           currentVersion,
13✔
581
                organizationID:           organizationID,
13✔
582
                workspaceID:              workspaceID,
13✔
583
                webserverURL:             webserverURL,
13✔
584
                dagDeployEnabled:         dagDeployEnabled,
13✔
585
                desiredDagTarballVersion: desiredDagTarballVersion,
13✔
586
                cicdEnforcement:          cicdEnforcement,
13✔
587
                isRemoteExecutionEnabled: isRemoteExecutionEnabled,
13✔
588
        }, nil
13✔
589
}
590

591
func buildImageWithoutDags(path, buildSecretString string, imageHandler airflow.ImageHandler) error {
19✔
592
        // flag to determine if we are setting the dags folder in dockerignore
19✔
593
        dagsIgnoreSet := false
19✔
594
        // flag to determine if dockerignore file was created on runtime
19✔
595
        dockerIgnoreCreate := false
19✔
596
        fullpath := filepath.Join(path, ".dockerignore")
19✔
597

19✔
598
        defer func() {
38✔
599
                // remove dags from .dockerignore file if we set it
19✔
600
                if dagsIgnoreSet {
38✔
601
                        removeDagsFromDockerIgnore(fullpath) //nolint:errcheck
19✔
602
                }
19✔
603
                // remove created docker ignore file
604
                if dockerIgnoreCreate {
19✔
605
                        os.Remove(fullpath)
×
606
                }
×
607
        }()
608

609
        fileExist, _ := fileutil.Exists(fullpath, nil)
19✔
610
        if !fileExist {
19✔
611
                // Create a dockerignore file and add the dags folder entry
×
612
                err := fileutil.WriteStringToFile(fullpath, "dags/")
×
613
                if err != nil {
×
614
                        return err
×
615
                }
×
616
                dockerIgnoreCreate = true
×
617
        }
618
        lines, err := fileutil.Read(fullpath)
19✔
619
        if err != nil {
19✔
620
                return err
×
621
        }
×
622
        contains, _ := fileutil.Contains(lines, "dags/")
19✔
623
        if !contains {
38✔
624
                f, err := os.OpenFile(fullpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) //nolint:mnd
19✔
625
                if err != nil {
19✔
626
                        return err
×
627
                }
×
628

629
                defer f.Close()
19✔
630

19✔
631
                if _, err := f.WriteString("\ndags/"); err != nil {
19✔
632
                        return err
×
633
                }
×
634

635
                dagsIgnoreSet = true
19✔
636
        }
637
        err = imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
19✔
638
        if err != nil {
19✔
639
                return err
×
640
        }
×
641

642
        // remove dags from .dockerignore file if we set it
643
        if dagsIgnoreSet {
38✔
644
                err = removeDagsFromDockerIgnore(fullpath)
19✔
645
                if err != nil {
19✔
646
                        return err
×
647
                }
×
648
        }
649

650
        return nil
19✔
651
}
652

653
func buildImage(path, currentVersion, deployImage, imageName, organizationID, buildSecretString string, dagDeployEnabled, isRemoteExecutionEnabled bool, platformCoreClient astroplatformcore.CoreClient) (version string, err error) {
34✔
654
        imageHandler := airflowImageHandler(deployImage)
34✔
655

34✔
656
        if imageName == "" {
61✔
657
                // Build our image
27✔
658
                fmt.Println(composeImageBuildingPromptMsg)
27✔
659

27✔
660
                if dagDeployEnabled || isRemoteExecutionEnabled {
46✔
661
                        err := buildImageWithoutDags(path, buildSecretString, imageHandler)
19✔
662
                        if err != nil {
19✔
663
                                return "", err
×
664
                        }
×
665
                } else {
8✔
666
                        err := imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
8✔
667
                        if err != nil {
9✔
668
                                return "", err
1✔
669
                        }
1✔
670
                }
671
        } else {
7✔
672
                // skip build if an imageName is passed
7✔
673
                fmt.Println(composeSkipImageBuildingPromptMsg)
7✔
674

7✔
675
                err := imageHandler.TagLocalImage(imageName)
7✔
676
                if err != nil {
7✔
677
                        return "", err
×
678
                }
×
679
        }
680

681
        // parse dockerfile
682
        cmds, err := docker.ParseFile(filepath.Join(path, dockerfile))
33✔
683
        if err != nil {
34✔
684
                return "", errors.Wrapf(err, "failed to parse dockerfile: %s", filepath.Join(path, dockerfile))
1✔
685
        }
1✔
686

687
        DockerfileImage := docker.GetImageFromParsedFile(cmds)
32✔
688

32✔
689
        version, err = imageHandler.GetLabel("", runtimeImageLabel)
32✔
690
        if err != nil {
32✔
691
                fmt.Println("unable get runtime version from image")
×
692
        }
×
693

694
        if config.CFG.ShowWarnings.GetBool() && version == "" {
32✔
695
                fmt.Printf(warningInvalidImageNameMsg, DockerfileImage)
×
696
                fmt.Println("Canceling deploy...")
×
697
                os.Exit(1)
×
698
        }
×
699

700
        resp, err := platformCoreClient.GetDeploymentOptionsWithResponse(httpContext.Background(), organizationID, &astroplatformcore.GetDeploymentOptionsParams{})
32✔
701
        if err != nil {
33✔
702
                return "", err
1✔
703
        }
1✔
704
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
31✔
705
        if err != nil {
31✔
706
                return "", err
×
707
        }
×
708
        deploymentOptionsRuntimeVersions := []string{}
31✔
709
        for _, runtimeRelease := range resp.JSON200.RuntimeReleases {
217✔
710
                deploymentOptionsRuntimeVersions = append(deploymentOptionsRuntimeVersions, runtimeRelease.Version)
186✔
711
        }
186✔
712

713
        if !ValidRuntimeVersion(currentVersion, version, deploymentOptionsRuntimeVersions) {
31✔
714
                fmt.Println("Canceling deploy...")
×
715
                os.Exit(1)
×
716
        }
×
717

718
        WarnIfNonLatestVersion(version, httputil.NewHTTPClient())
31✔
719

31✔
720
        return version, nil
31✔
721
}
722

723
// finalize deploy
724
func finalizeDeploy(deployID, deploymentID, organizationID, dagTarballVersion string, dagDeploy bool, platformCoreClient astroplatformcore.CoreClient) error {
32✔
725
        finalizeDeployRequest := astroplatformcore.FinalizeDeployRequest{}
32✔
726
        if dagDeploy {
54✔
727
                finalizeDeployRequest.DagTarballVersion = &dagTarballVersion
22✔
728
        }
22✔
729
        resp, err := platformCoreClient.FinalizeDeployWithResponse(httpContext.Background(), organizationID, deploymentID, deployID, finalizeDeployRequest)
32✔
730
        if err != nil {
32✔
731
                return err
×
732
        }
×
733
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
32✔
734
        if err != nil {
32✔
735
                return err
×
736
        }
×
737
        if resp.JSON200.DagTarballVersion != nil {
64✔
738
                fmt.Println("Deployed DAG bundle: ", *resp.JSON200.DagTarballVersion)
32✔
739
        }
32✔
740
        if resp.JSON200.ImageTag != "" {
64✔
741
                fmt.Println("Deployed Image Tag: ", resp.JSON200.ImageTag)
32✔
742
        }
32✔
743
        return nil
32✔
744
}
745

746
func createDeploy(organizationID, deploymentID string, request astroplatformcore.CreateDeployRequest, platformCoreClient astroplatformcore.CoreClient) (*astroplatformcore.Deploy, error) {
38✔
747
        resp, err := platformCoreClient.CreateDeployWithResponse(httpContext.Background(), organizationID, deploymentID, request)
38✔
748
        if err != nil {
38✔
749
                return nil, err
×
750
        }
×
751
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
38✔
752
        if err != nil {
38✔
753
                return nil, err
×
754
        }
×
755
        return resp.JSON200, err
38✔
756
}
757

758
func ValidRuntimeVersion(currentVersion, tag string, deploymentOptionsRuntimeVersions []string) bool {
41✔
759
        // Allow old deployments which do not have runtimeVersion tag
41✔
760
        if currentVersion == "" {
42✔
761
                return true
1✔
762
        }
1✔
763

764
        // Check that the tag is not a downgrade
765
        if airflowversions.CompareRuntimeVersions(tag, currentVersion) < 0 {
43✔
766
                fmt.Printf("Cannot deploy a downgraded Astro Runtime version. Modify your Astro Runtime version to %s or higher in your Dockerfile\n", currentVersion)
3✔
767
                return false
3✔
768
        }
3✔
769

770
        // Check that the tag is supported by the deployment
771
        tagInDeploymentOptions := false
37✔
772
        for _, runtimeVersion := range deploymentOptionsRuntimeVersions {
100✔
773
                if airflowversions.CompareRuntimeVersions(tag, runtimeVersion) == 0 {
99✔
774
                        tagInDeploymentOptions = true
36✔
775
                        break
36✔
776
                }
777
        }
778
        if !tagInDeploymentOptions {
38✔
779
                fmt.Println("Cannot deploy an unsupported Astro Runtime version. Modify your Astro Runtime version to a supported version in your Dockerfile")
1✔
780
                fmt.Printf("Supported versions: %s\n", strings.Join(deploymentOptionsRuntimeVersions, ", "))
1✔
781
                return false
1✔
782
        }
1✔
783

784
        // If upgrading from Airflow 2 to Airflow 3, we require at least Runtime 12.0.0 (Airflow 2.10.0)
785
        currentVersionAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(currentVersion)
36✔
786
        tagAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(tag)
36✔
787
        if currentVersionAirflowMajorVersion == "2" && tagAirflowMajorVersion == "3" {
38✔
788
                if airflowversions.CompareRuntimeVersions(currentVersion, "12.0.0") < 0 {
3✔
789
                        fmt.Println("Can only upgrade deployment from Airflow 2 to Airflow 3 with deployment at Astro Runtime 12.0.0 or higher")
1✔
790
                        return false
1✔
791
                }
1✔
792
        }
793

794
        return true
35✔
795
}
796

797
func WarnIfNonLatestVersion(version string, httpClient *httputil.HTTPClient) {
34✔
798
        client := airflowversions.NewClient(httpClient, false, false)
34✔
799
        latestRuntimeVersion, err := airflowversions.GetDefaultImageTag(client, "", "", false)
34✔
800
        if err != nil {
36✔
801
                logger.Debugf("unable to get latest runtime version: %s", err)
2✔
802
                return
2✔
803
        }
2✔
804

805
        if airflowversions.CompareRuntimeVersions(version, latestRuntimeVersion) < 0 {
64✔
806
                fmt.Printf("WARNING! You are currently running Astro Runtime Version %s\nConsider upgrading to the latest version, Astro Runtime %s\n", version, latestRuntimeVersion)
32✔
807
        }
32✔
808
}
809

810
// DeployClientImage handles the client deploy functionality
811
func DeployClientImage(deployInput InputClientDeploy) error { //nolint:gocritic
6✔
812
        c, err := config.GetCurrentContext()
6✔
813
        if err != nil {
6✔
814
                return errors.Wrap(err, "failed to get current context")
×
815
        }
×
816

817
        // Get the remote client registry endpoint from config
818
        registryEndpoint := config.CFG.RemoteClientRegistry.GetString()
6✔
819
        if registryEndpoint == "" {
7✔
820
                fmt.Println("The Astro CLI is not configured to push client images to your private registry.")
1✔
821
                fmt.Println("For remote Deployments, client images must be stored in your private registry, not in Astronomer managed registries.")
1✔
822
                fmt.Println("Please provide your private registry information so the Astro CLI can push client images.")
1✔
823
                return errors.New("remote client registry is not configured. To configure it, run: 'astro config set remote.client_registry <endpoint>' and try again.")
1✔
824
        }
1✔
825

826
        // Use consistent deploy-<timestamp> tagging mechanism like regular deploys
827
        // The ImageName flag only specifies which local image to use, not the remote tag
828
        imageTag := "deploy-" + time.Now().UTC().Format("2006-01-02T15-04")
5✔
829

5✔
830
        // Build the full remote image name
5✔
831
        remoteImage := fmt.Sprintf("%s:%s", registryEndpoint, imageTag)
5✔
832

5✔
833
        // Create an image handler for building and pushing
5✔
834
        imageHandler := airflowImageHandler(remoteImage)
5✔
835

5✔
836
        if deployInput.ImageName != "" {
6✔
837
                // Use the provided local image (tag will be ignored, remote tag is always timestamp-based)
1✔
838
                fmt.Println("Using provided image:", deployInput.ImageName)
1✔
839
                err := imageHandler.TagLocalImage(deployInput.ImageName)
1✔
840
                if err != nil {
1✔
841
                        return fmt.Errorf("failed to tag local image: %w", err)
×
842
                }
×
843
        } else {
4✔
844
                // Authenticate with the base image registry before building
4✔
845
                // This is needed because Dockerfile.client uses base images from a private registry
4✔
846

4✔
847
                // Skip registry login if the base image registry is not from astronomer, check the content of the Dockerfile.client file
4✔
848
                dockerfileClientContent, err := fileutil.ReadFileToString(filepath.Join(deployInput.Path, "Dockerfile.client"))
4✔
849
                if util.IsAstronomerRegistry(dockerfileClientContent) || err != nil {
8✔
850
                        // login to the registry
4✔
851
                        if err != nil {
8✔
852
                                fmt.Println("WARNING: Failed to read Dockerfile.client, so will assume the base image is using images.astronomer.cloud and try to login to the registry")
4✔
853
                        }
4✔
854
                        baseImageRegistry := config.CFG.RemoteBaseImageRegistry.GetString()
4✔
855
                        fmt.Printf("Authenticating with base image registry: %s\n", baseImageRegistry)
4✔
856
                        err := airflow.DockerLogin(baseImageRegistry, registryUsername, c.Token)
4✔
857
                        if err != nil {
5✔
858
                                fmt.Println("Failed to authenticate with Astronomer registry that contains the base agent image used in the Dockerfile.client file.")
1✔
859
                                fmt.Println("This could be because either your token has expired or you don't have permission to pull the base agent image.")
1✔
860
                                fmt.Println("Please re-login via `astro login` to refresh the credentials or validate that `ASTRO_API_TOKEN` environment variable is set with the correct token and try again")
1✔
861
                                return fmt.Errorf("failed to authenticate with registry %s: %w", baseImageRegistry, err)
1✔
862
                        }
1✔
863
                }
864

865
                // Build the client image from the current directory
866
                // Determine target platforms for client deploy
867
                var targetPlatforms []string
3✔
868
                if deployInput.Platform != "" {
3✔
869
                        // Parse comma-separated platforms from --platform flag
×
870
                        targetPlatforms = strings.Split(deployInput.Platform, ",")
×
871
                        // Trim whitespace from each platform
×
872
                        for i, platform := range targetPlatforms {
×
873
                                targetPlatforms[i] = strings.TrimSpace(platform)
×
874
                        }
×
875
                        fmt.Printf("Building client image for platforms: %s\n", strings.Join(targetPlatforms, ", "))
×
876
                } else {
3✔
877
                        // Use empty slice to let Docker build for host platform by default
3✔
878
                        targetPlatforms = []string{}
3✔
879
                        fmt.Println("Building client image for host platform")
3✔
880
                }
3✔
881

882
                buildConfig := types.ImageBuildConfig{
3✔
883
                        Path:            deployInput.Path,
3✔
884
                        TargetPlatforms: targetPlatforms,
3✔
885
                }
3✔
886

3✔
887
                err = imageHandler.Build("Dockerfile.client", deployInput.BuildSecretString, buildConfig)
3✔
888
                if err != nil {
4✔
889
                        return fmt.Errorf("failed to build client image: %w", err)
1✔
890
                }
1✔
891
        }
892

893
        // Push the image to the remote registry (assumes docker login was done externally)
894
        fmt.Println("Pushing client image to configured remote registry")
3✔
895
        _, err = imageHandler.Push(remoteImage, "", "", false)
3✔
896
        if err != nil {
4✔
897
                if errors.Is(err, airflow.ErrImagePush403) {
1✔
898
                        fmt.Printf("\n--------------------------------\n")
×
899
                        fmt.Printf("Failed to push client image to %s\n", registryEndpoint)
×
900
                        fmt.Println("It could be due to either your registry token has expired or you don't have permission to push the client image")
×
901
                        fmt.Printf("Please ensure that you have logged in to `%s` via `docker login` and try again\n\n", registryEndpoint)
×
902
                }
×
903
                return fmt.Errorf("failed to push client image: %w", err)
1✔
904
        }
905

906
        fmt.Printf("Successfully pushed client image to %s\n", ansi.Bold(remoteImage))
2✔
907

2✔
908
        fmt.Printf("\n--------------------------------\n")
2✔
909
        fmt.Println("The client image has been pushed to your private registry.")
2✔
910
        fmt.Println("Your next step would be to update the agent component to use the new client image.")
2✔
911
        fmt.Println("For that you would either need to update the helm chart values.yaml file or update your CI/CD pipeline to use the new client image.")
2✔
912
        fmt.Printf("If you are using Astronomer provided Agent Helm chart, you would need to update the `image` field for each of the workers, dagProcessor, and triggerer component sections to the new image: %s\n", remoteImage)
2✔
913
        fmt.Println("Once you have updated the helm chart values.yaml file, you can run 'helm upgrade' or update via your CI/CD pipeline to update the agent components")
2✔
914

2✔
915
        return nil
2✔
916
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc