• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

astronomer / astro-cli / daba1279-352d-4b3f-86b8-9669f3a12ad3

09 Oct 2025 12:54PM UTC coverage: 38.501% (+0.06%) from 38.44%
daba1279-352d-4b3f-86b8-9669f3a12ad3

Pull #1954

circleci

feluelle
Move client deploy to a new remote deploy command
Pull Request #1954: Add client deploy support for RE projects

99 of 125 new or added lines in 6 files covered. (79.2%)

54 existing lines in 5 files now uncovered.

24125 of 62661 relevant lines covered (38.5%)

10.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.3
/cloud/deploy/deploy.go
1
package deploy
2

3
import (
4
        "bufio"
5
        "bytes"
6
        httpContext "context"
7
        "fmt"
8
        "os"
9
        "path/filepath"
10
        "strings"
11
        "time"
12

13
        "github.com/astronomer/astro-cli/airflow"
14
        "github.com/astronomer/astro-cli/airflow/types"
15
        airflowversions "github.com/astronomer/astro-cli/airflow_versions"
16
        astrocore "github.com/astronomer/astro-cli/astro-client-core"
17
        astroplatformcore "github.com/astronomer/astro-cli/astro-client-platform-core"
18
        "github.com/astronomer/astro-cli/cloud/deployment"
19
        "github.com/astronomer/astro-cli/cloud/organization"
20
        "github.com/astronomer/astro-cli/config"
21
        "github.com/astronomer/astro-cli/docker"
22
        "github.com/astronomer/astro-cli/pkg/ansi"
23
        "github.com/astronomer/astro-cli/pkg/azure"
24
        "github.com/astronomer/astro-cli/pkg/fileutil"
25
        "github.com/astronomer/astro-cli/pkg/httputil"
26
        "github.com/astronomer/astro-cli/pkg/input"
27
        "github.com/astronomer/astro-cli/pkg/logger"
28
        "github.com/astronomer/astro-cli/pkg/util"
29
        "github.com/pkg/errors"
30
)
31

32
const (
33
        parse                  = "parse"
34
        astroDomain            = "astronomer.io"
35
        registryUsername       = "cli"
36
        runtimeImageLabel      = airflow.RuntimeImageLabel
37
        dagParseAllowedVersion = "4.1.0"
38

39
        composeImageBuildingPromptMsg     = "Building image..."
40
        composeSkipImageBuildingPromptMsg = "Skipping building image..."
41
        deploymentHeaderMsg               = "Authenticated to %s \n\n"
42

43
        warningInvalidImageNameMsg = "WARNING! The image in your Dockerfile '%s' is not based on Astro Runtime and is not supported. Change your Dockerfile with an image that pulls from 'quay.io/astronomer/astro-runtime' to proceed.\n"
44

45
        allTests                 = "all-tests"
46
        parseAndPytest           = "parse-and-all-tests"
47
        enableDagDeployMsg       = "DAG-only deploys are not enabled for this Deployment. Run 'astro deployment update %s --dag-deploy enable' to enable DAG-only deploys"
48
        dagDeployDisabled        = "dag deploy is not enabled for deployment"
49
        invalidWorkspaceID       = "Invalid workspace id %s was provided through the --workspace-id flag\n"
50
        errCiCdEnforcementUpdate = "cannot deploy since ci/cd enforcement is enabled for the deployment %s. Please use API Tokens instead"
51
)
52

53
var (
54
        pytestFile string
55
        dockerfile = "Dockerfile"
56

57
        deployImagePlatformSupport = []string{"linux/amd64"}
58

59
        // Monkey patched to write unit tests
60
        airflowImageHandler  = airflow.ImageHandlerInit
61
        containerHandlerInit = airflow.ContainerHandlerInit
62
        azureUploader        = azure.Upload
63
        canCiCdDeploy        = deployment.CanCiCdDeploy
64
        dagTarballVersion    = ""
65
        dagsUploadURL        = ""
66
        nextTag              = ""
67
)
68

69
var (
70
        errDagsParseFailed = errors.New("your local DAGs did not parse. Fix the listed errors or use `astro deploy [deployment-id] -f` to force deploy") //nolint:revive
71
        envFileMissing     = errors.New("Env file path is incorrect: ")                                                                                  //nolint:revive
72
)
73

74
var (
75
        sleepTime              = 90
76
        dagOnlyDeploySleepTime = 30
77
        tickNum                = 10
78
        timeoutNum             = 180
79
)
80

81
type deploymentInfo struct {
82
        deploymentID             string
83
        namespace                string
84
        deployImage              string
85
        currentVersion           string
86
        organizationID           string
87
        workspaceID              string
88
        webserverURL             string
89
        deploymentType           string
90
        desiredDagTarballVersion string
91
        dagDeployEnabled         bool
92
        cicdEnforcement          bool
93
        name                     string
94
        isRemoteExecutionEnabled bool
95
}
96

97
type InputDeploy struct {
98
        Path              string
99
        RuntimeID         string
100
        WsID              string
101
        Pytest            string
102
        EnvFile           string
103
        ImageName         string
104
        DeploymentName    string
105
        Prompt            bool
106
        Dags              bool
107
        Image             bool
108
        WaitForStatus     bool
109
        DagsPath          string
110
        Description       string
111
        BuildSecretString string
112
        ForceUpgradeToAF3 bool
113
}
114

115
// InputClientDeploy contains inputs for client image deployments
116
type InputClientDeploy struct {
117
        Path              string
118
        ImageName         string
119
        Platform          string
120
        BuildSecretString string
121
}
122

123
const accessYourDeploymentFmt = `
124

125
 Access your Deployment:
126

127
 Deployment View: %s
128
 Airflow UI: %s
129
`
130

131
func removeDagsFromDockerIgnore(fullpath string) error {
58✔
132
        f, err := os.Open(fullpath)
58✔
133
        if err != nil {
58✔
134
                return err
×
135
        }
×
136

137
        defer f.Close()
58✔
138

58✔
139
        var bs []byte
58✔
140
        buf := bytes.NewBuffer(bs)
58✔
141

58✔
142
        scanner := bufio.NewScanner(f)
58✔
143
        for scanner.Scan() {
96✔
144
                text := scanner.Text()
38✔
145
                if text != "dags/" {
57✔
146
                        _, err = buf.WriteString(text + "\n")
19✔
147
                        if err != nil {
19✔
148
                                return err
×
149
                        }
×
150
                }
151
        }
152

153
        if err := scanner.Err(); err != nil {
58✔
154
                return err
×
155
        }
×
156
        err = os.WriteFile(fullpath, bytes.Trim(buf.Bytes(), "\n"), 0o666) //nolint:gosec, mnd
58✔
157
        if err != nil {
58✔
158
                return err
×
159
        }
×
160

161
        return nil
58✔
162
}
163

164
func shouldIncludeMonitoringDag(deploymentType astroplatformcore.DeploymentType) bool {
20✔
165
        return !organization.IsOrgHosted() && !deployment.IsDeploymentDedicated(deploymentType) && !deployment.IsDeploymentStandard(deploymentType)
20✔
166
}
20✔
167

168
func deployDags(path, dagsPath, dagsUploadURL, currentRuntimeVersion string, deploymentType astroplatformcore.DeploymentType) (string, error) {
20✔
169
        if shouldIncludeMonitoringDag(deploymentType) {
36✔
170
                monitoringDagPath := filepath.Join(dagsPath, "astronomer_monitoring_dag.py")
16✔
171

16✔
172
                // Create monitoring dag file
16✔
173
                err := fileutil.WriteStringToFile(monitoringDagPath, airflow.Af2MonitoringDag)
16✔
174
                if err != nil {
16✔
175
                        return "", err
×
176
                }
×
177

178
                // Remove the monitoring dag file after the upload
179
                defer os.Remove(monitoringDagPath)
16✔
180
        }
181

182
        versionID, err := UploadBundle(path, dagsPath, dagsUploadURL, true, currentRuntimeVersion)
20✔
183
        if err != nil {
20✔
184
                return "", err
×
185
        }
×
186

187
        return versionID, nil
20✔
188
}
189

190
// Deploy pushes a new docker image
191
func Deploy(deployInput InputDeploy, platformCoreClient astroplatformcore.CoreClient, coreClient astrocore.CoreClient) error { //nolint
41✔
192
        c, err := config.GetCurrentContext()
41✔
193
        if err != nil {
42✔
194
                return err
1✔
195
        }
1✔
196

197
        if c.Domain == astroDomain {
43✔
198
                fmt.Printf(deploymentHeaderMsg, "Astro")
3✔
199
        } else {
40✔
200
                fmt.Printf(deploymentHeaderMsg, c.Domain)
37✔
201
        }
37✔
202

203
        deployInfo, err := getDeploymentInfo(deployInput.RuntimeID, deployInput.WsID, deployInput.DeploymentName, deployInput.Prompt, platformCoreClient, coreClient)
40✔
204
        if err != nil {
40✔
205
                return err
×
206
        }
×
207

208
        var dagsPath string
40✔
209
        if deployInput.DagsPath != "" {
55✔
210
                dagsPath = deployInput.DagsPath
15✔
211
        } else {
40✔
212
                dagsPath = filepath.Join(deployInput.Path, "dags")
25✔
213
        }
25✔
214

215
        var dagFiles []string
40✔
216
        if !deployInfo.isRemoteExecutionEnabled {
75✔
217
                dagFiles = fileutil.GetFilesWithSpecificExtension(dagsPath, ".py")
35✔
218
        }
35✔
219

220
        if deployInfo.cicdEnforcement {
41✔
221
                if !canCiCdDeploy(c.Token) {
2✔
222
                        return fmt.Errorf(errCiCdEnforcementUpdate, deployInfo.name) //nolint
1✔
223
                }
1✔
224
        }
225

226
        if deployInput.WsID != deployInfo.workspaceID {
40✔
227
                fmt.Printf(invalidWorkspaceID, deployInput.WsID)
1✔
228
                return nil
1✔
229
        }
1✔
230

231
        if deployInput.Image {
41✔
232
                if !deployInfo.dagDeployEnabled {
3✔
233
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
234
                }
×
235
        }
236

237
        deploymentURL, err := deployment.GetDeploymentURL(deployInfo.deploymentID, deployInfo.workspaceID)
38✔
238
        if err != nil {
38✔
239
                return err
×
240
        }
×
241
        createDeployRequest := astroplatformcore.CreateDeployRequest{
38✔
242
                Description: &deployInput.Description,
38✔
243
        }
38✔
244
        switch {
38✔
245
        case deployInput.Dags:
18✔
246
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeDAGONLY
18✔
247
        case deployInput.Image:
1✔
248
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEONLY
1✔
249
        default:
19✔
250
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEANDDAG
19✔
251
        }
252
        deploy, err := createDeploy(deployInfo.organizationID, deployInfo.deploymentID, createDeployRequest, platformCoreClient)
38✔
253
        if err != nil {
38✔
254
                return err
×
255
        }
×
256
        deployID := deploy.Id
38✔
257
        if deploy.DagsUploadUrl != nil {
76✔
258
                dagsUploadURL = *deploy.DagsUploadUrl
38✔
259
        } else {
38✔
260
                dagsUploadURL = ""
×
261
        }
×
262
        if deploy.ImageTag != "" {
38✔
263
                nextTag = deploy.ImageTag
×
264
        } else {
38✔
265
                nextTag = ""
38✔
266
        }
38✔
267

268
        if deployInput.Dags {
56✔
269
                if len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() {
19✔
270
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
1✔
271

1✔
272
                        if !i {
2✔
273
                                fmt.Println("Canceling deploy...")
1✔
274
                                return nil
1✔
275
                        }
1✔
276
                }
277
                if deployInput.Pytest != "" {
29✔
278
                        runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, deployInput.ForceUpgradeToAF3, platformCoreClient)
12✔
279
                        if err != nil {
12✔
280
                                return err
×
281
                        }
×
282

283
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
12✔
284
                        if err != nil {
14✔
285
                                return err
2✔
286
                        }
2✔
287
                }
288

289
                if !deployInfo.dagDeployEnabled {
16✔
290
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
1✔
291
                }
1✔
292

293
                fmt.Println("Initiating DAG deploy for: " + deployInfo.deploymentID)
14✔
294
                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
14✔
295
                if err != nil {
14✔
296
                        if strings.Contains(err.Error(), dagDeployDisabled) {
×
297
                                return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
298
                        }
×
299

300
                        return err
×
301
                }
302

303
                // finish deploy
304
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
14✔
305
                if err != nil {
14✔
306
                        return err
×
307
                }
×
308

309
                if deployInput.WaitForStatus {
15✔
310
                        // Keeping wait timeout low since dag only deploy is faster
1✔
311
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, dagOnlyDeploySleepTime, tickNum, timeoutNum, platformCoreClient)
1✔
312
                        if err != nil {
2✔
313
                                return err
1✔
314
                        }
1✔
315

316
                        fmt.Println(
×
317
                                "\nSuccessfully uploaded DAGs with version " + ansi.Bold(dagTarballVersion) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful." +
×
318
                                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold(deploymentURL), ansi.Bold(deployInfo.webserverURL)),
×
319
                        )
×
320

×
321
                        return nil
×
322
                }
323

324
                fmt.Println(
13✔
325
                        "\nSuccessfully uploaded DAGs with version " + ansi.Bold(
13✔
326
                                dagTarballVersion,
13✔
327
                        ) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful. The Airflow UI takes about 1 minute to update." +
13✔
328
                                fmt.Sprintf(
13✔
329
                                        accessYourDeploymentFmt,
13✔
330
                                        ansi.Bold(deploymentURL),
13✔
331
                                        ansi.Bold(deployInfo.webserverURL),
13✔
332
                                ),
13✔
333
                )
13✔
334
        } else {
20✔
335
                fullpath := filepath.Join(deployInput.Path, ".dockerignore")
20✔
336
                fileExist, _ := fileutil.Exists(fullpath, nil)
20✔
337
                if fileExist {
40✔
338
                        err := removeDagsFromDockerIgnore(fullpath)
20✔
339
                        if err != nil {
20✔
340
                                return errors.Wrap(err, "Found dags entry in .dockerignore file. Remove this entry and try again")
×
341
                        }
×
342
                }
343
                envFileExists, _ := fileutil.Exists(deployInput.EnvFile, nil)
20✔
344
                if !envFileExists && deployInput.EnvFile != ".env" {
21✔
345
                        return fmt.Errorf("%w %s", envFileMissing, deployInput.EnvFile)
1✔
346
                }
1✔
347

348
                if deployInfo.dagDeployEnabled && len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() && !deployInput.Image {
19✔
349
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
×
350

×
351
                        if !i {
×
352
                                fmt.Println("Canceling deploy...")
×
353
                                return nil
×
354
                        }
×
355
                }
356

357
                // Build our image
358
                runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, deployInput.ForceUpgradeToAF3, platformCoreClient)
19✔
359
                if err != nil {
19✔
360
                        return err
×
361
                }
×
362

363
                if len(dagFiles) > 0 {
26✔
364
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
7✔
365
                        if err != nil {
8✔
366
                                return err
1✔
367
                        }
1✔
368
                } else {
12✔
369
                        fmt.Println("No DAGs found. Skipping testing...")
12✔
370
                }
12✔
371

372
                repository := deploy.ImageRepository
18✔
373
                // TODO: Resolve the edge case where two people push the same nextTag at the same time
18✔
374
                remoteImage := fmt.Sprintf("%s:%s", repository, nextTag)
18✔
375

18✔
376
                imageHandler := airflowImageHandler(deployInfo.deployImage)
18✔
377
                fmt.Println("Pushing image to Astronomer registry")
18✔
378
                _, err = imageHandler.Push(remoteImage, registryUsername, c.Token, false)
18✔
379
                if err != nil {
18✔
380
                        return err
×
381
                }
×
382

383
                if deployInfo.dagDeployEnabled && len(dagFiles) > 0 {
24✔
384
                        if !deployInput.Image {
12✔
385
                                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
6✔
386
                                if err != nil {
6✔
387
                                        return err
×
388
                                }
×
389
                        } else {
×
390
                                fmt.Println("Image Deploy only. Skipping deploying DAG...")
×
391
                        }
×
392
                }
393
                // finish deploy
394
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
18✔
395
                if err != nil {
18✔
396
                        return err
×
397
                }
×
398

399
                if deployInput.WaitForStatus {
20✔
400
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, sleepTime, tickNum, timeoutNum, platformCoreClient)
2✔
401
                        if err != nil {
4✔
402
                                return err
2✔
403
                        }
2✔
404
                }
405

406
                fmt.Println("Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags." +
16✔
407
                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold("https://"+deploymentURL), ansi.Bold("https://"+deployInfo.webserverURL)))
16✔
408
        }
409

410
        return nil
29✔
411
}
412

413
func getDeploymentInfo(
414
        deploymentID, wsID, deploymentName string,
415
        prompt bool,
416
        platformCoreClient astroplatformcore.CoreClient,
417
        coreClient astrocore.CoreClient,
418
) (deploymentInfo, error) {
40✔
419
        // Use config deployment if provided
40✔
420
        if deploymentID == "" {
54✔
421
                deploymentID = config.CFG.ProjectDeployment.GetProjectString()
14✔
422
                if deploymentID != "" {
14✔
423
                        fmt.Printf("Deployment ID found in the config file. This Deployment ID will be used for the deploy\n")
×
424
                }
×
425
        }
426

427
        if deploymentID != "" && deploymentName != "" {
48✔
428
                fmt.Printf("Both a Deployment ID and Deployment name have been supplied. The Deployment ID %s will be used for the Deploy\n", deploymentID)
8✔
429
        }
8✔
430

431
        // check if deploymentID or if force prompt was requested was given by user
432
        if deploymentID == "" || prompt {
67✔
433
                currentDeployment, err := deployment.GetDeployment(wsID, deploymentID, deploymentName, false, nil, platformCoreClient, coreClient)
27✔
434
                if err != nil {
27✔
435
                        return deploymentInfo{}, err
×
436
                }
×
437
                coreDeployment, err := deployment.CoreGetDeployment(currentDeployment.OrganizationId, currentDeployment.Id, platformCoreClient)
27✔
438
                if err != nil {
27✔
439
                        return deploymentInfo{}, err
×
440
                }
×
441
                var desiredDagTarballVersion string
27✔
442
                if coreDeployment.DesiredDagTarballVersion != nil {
45✔
443
                        desiredDagTarballVersion = *coreDeployment.DesiredDagTarballVersion
18✔
444
                } else {
27✔
445
                        desiredDagTarballVersion = ""
9✔
446
                }
9✔
447

448
                return deploymentInfo{
27✔
449
                        currentDeployment.Id,
27✔
450
                        currentDeployment.Namespace,
27✔
451
                        airflow.ImageName(currentDeployment.Namespace, "latest"),
27✔
452
                        currentDeployment.RuntimeVersion,
27✔
453
                        currentDeployment.OrganizationId,
27✔
454
                        currentDeployment.WorkspaceId,
27✔
455
                        currentDeployment.WebServerUrl,
27✔
456
                        string(*currentDeployment.Type),
27✔
457
                        desiredDagTarballVersion,
27✔
458
                        currentDeployment.IsDagDeployEnabled,
27✔
459
                        currentDeployment.IsCicdEnforced,
27✔
460
                        currentDeployment.Name,
27✔
461
                        deployment.IsRemoteExecutionEnabled(&currentDeployment),
27✔
462
                }, nil
27✔
463
        }
464
        c, err := config.GetCurrentContext()
13✔
465
        if err != nil {
13✔
466
                return deploymentInfo{}, err
×
467
        }
×
468
        deployInfo, err := getImageName(deploymentID, c.Organization, platformCoreClient)
13✔
469
        if err != nil {
13✔
470
                return deploymentInfo{}, err
×
471
        }
×
472
        deployInfo.deploymentID = deploymentID
13✔
473
        return deployInfo, nil
13✔
474
}
475

476
func parseOrPytestDAG(pytest, runtimeVersion, envFile, deployImage, namespace, buildSecretString string) error {
19✔
477
        validDAGParseVersion := airflowversions.CompareRuntimeVersions(runtimeVersion, dagParseAllowedVersion) >= 0
19✔
478
        if !validDAGParseVersion {
19✔
479
                fmt.Println("\nruntime image is earlier than 4.1.0, this deploy will skip DAG parse...")
×
480
        }
×
481

482
        containerHandler, err := containerHandlerInit(config.WorkingPath, envFile, "Dockerfile", namespace)
19✔
483
        if err != nil {
19✔
484
                return err
×
485
        }
×
486

487
        switch {
19✔
488
        case pytest == parse && validDAGParseVersion:
7✔
489
                // parse dags
7✔
490
                fmt.Println("Testing image...")
7✔
491
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
7✔
492
                if err != nil {
9✔
493
                        return err
2✔
494
                }
2✔
495
        case pytest != "" && pytest != parse && pytest != parseAndPytest:
6✔
496
                // check pytests
6✔
497
                fmt.Println("Testing image...")
6✔
498
                err := checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
499
                if err != nil {
7✔
500
                        return err
1✔
501
                }
1✔
502
        case pytest == parseAndPytest:
6✔
503
                // parse dags and check pytests
6✔
504
                fmt.Println("Testing image...")
6✔
505
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
6✔
506
                if err != nil {
6✔
507
                        return err
×
508
                }
×
509

510
                err = checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
511
                if err != nil {
6✔
512
                        return err
×
513
                }
×
514
        }
515
        return nil
16✔
516
}
517

518
func parseDAGs(deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
13✔
519
        if !config.CFG.SkipParse.GetBool() && !util.CheckEnvBool(os.Getenv("ASTRONOMER_SKIP_PARSE")) {
26✔
520
                err := containerHandler.Parse("", deployImage, buildSecretString)
13✔
521
                if err != nil {
15✔
522
                        fmt.Println(err)
2✔
523
                        return errDagsParseFailed
2✔
524
                }
2✔
525
        } else {
×
526
                fmt.Println("Skipping parsing dags due to skip parse being set to true in either the config.yaml or local environment variables")
×
527
        }
×
528

529
        return nil
11✔
530
}
531

532
// Validate code with pytest
533
func checkPytest(pytest, deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
14✔
534
        if pytest != allTests && pytest != parseAndPytest {
18✔
535
                pytestFile = pytest
4✔
536
        }
4✔
537

538
        exitCode, err := containerHandler.Pytest(pytestFile, "", deployImage, "", buildSecretString)
14✔
539
        if err != nil {
17✔
540
                if strings.Contains(exitCode, "1") { // exit code is 1 meaning tests failed
4✔
541
                        return errors.New("at least 1 pytest in your tests directory failed. Fix the issues listed or rerun the command without the '--pytest' flag to deploy")
1✔
542
                }
1✔
543
                return errors.Wrap(err, "Something went wrong while Pytesting your DAGs,\nif the issue persists rerun the command without the '--pytest' flag to deploy")
2✔
544
        }
545

546
        fmt.Print("\nAll Pytests passed!\n")
11✔
547
        return err
11✔
548
}
549

550
func getImageName(deploymentID, organizationID string, platformCoreClient astroplatformcore.CoreClient) (deploymentInfo, error) {
13✔
551
        resp, err := platformCoreClient.GetDeploymentWithResponse(httpContext.Background(), organizationID, deploymentID)
13✔
552
        if err != nil {
13✔
553
                return deploymentInfo{}, err
×
554
        }
×
555

556
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
13✔
557
        if err != nil {
13✔
558
                return deploymentInfo{}, err
×
559
        }
×
560

561
        currentVersion := resp.JSON200.RuntimeVersion
13✔
562
        namespace := resp.JSON200.Namespace
13✔
563
        workspaceID := resp.JSON200.WorkspaceId
13✔
564
        webserverURL := resp.JSON200.WebServerUrl
13✔
565
        dagDeployEnabled := resp.JSON200.IsDagDeployEnabled
13✔
566
        cicdEnforcement := resp.JSON200.IsCicdEnforced
13✔
567
        isRemoteExecutionEnabled := deployment.IsRemoteExecutionEnabled(resp.JSON200)
13✔
568
        var desiredDagTarballVersion string
13✔
569
        if resp.JSON200.DesiredDagTarballVersion != nil {
18✔
570
                desiredDagTarballVersion = *resp.JSON200.DesiredDagTarballVersion
5✔
571
        } else {
13✔
572
                desiredDagTarballVersion = ""
8✔
573
        }
8✔
574

575
        // We use latest and keep this tag around after deployments to keep subsequent deploys quick
576
        deployImage := airflow.ImageName(namespace, "latest")
13✔
577

13✔
578
        return deploymentInfo{
13✔
579
                namespace:                namespace,
13✔
580
                deployImage:              deployImage,
13✔
581
                currentVersion:           currentVersion,
13✔
582
                organizationID:           organizationID,
13✔
583
                workspaceID:              workspaceID,
13✔
584
                webserverURL:             webserverURL,
13✔
585
                dagDeployEnabled:         dagDeployEnabled,
13✔
586
                desiredDagTarballVersion: desiredDagTarballVersion,
13✔
587
                cicdEnforcement:          cicdEnforcement,
13✔
588
                isRemoteExecutionEnabled: isRemoteExecutionEnabled,
13✔
589
        }, nil
13✔
590
}
591

592
func buildImageWithoutDags(path, buildSecretString string, imageHandler airflow.ImageHandler) error {
19✔
593
        // flag to determine if we are setting the dags folder in dockerignore
19✔
594
        dagsIgnoreSet := false
19✔
595
        // flag to determine if dockerignore file was created on runtime
19✔
596
        dockerIgnoreCreate := false
19✔
597
        fullpath := filepath.Join(path, ".dockerignore")
19✔
598

19✔
599
        defer func() {
38✔
600
                // remove dags from .dockerignore file if we set it
19✔
601
                if dagsIgnoreSet {
38✔
602
                        removeDagsFromDockerIgnore(fullpath) //nolint:errcheck
19✔
603
                }
19✔
604
                // remove created docker ignore file
605
                if dockerIgnoreCreate {
19✔
606
                        os.Remove(fullpath)
×
607
                }
×
608
        }()
609

610
        fileExist, _ := fileutil.Exists(fullpath, nil)
19✔
611
        if !fileExist {
19✔
612
                // Create a dockerignore file and add the dags folder entry
×
613
                err := fileutil.WriteStringToFile(fullpath, "dags/")
×
614
                if err != nil {
×
615
                        return err
×
616
                }
×
617
                dockerIgnoreCreate = true
×
618
        }
619
        lines, err := fileutil.Read(fullpath)
19✔
620
        if err != nil {
19✔
621
                return err
×
622
        }
×
623
        contains, _ := fileutil.Contains(lines, "dags/")
19✔
624
        if !contains {
38✔
625
                f, err := os.OpenFile(fullpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) //nolint:mnd
19✔
626
                if err != nil {
19✔
627
                        return err
×
628
                }
×
629

630
                defer f.Close()
19✔
631

19✔
632
                if _, err := f.WriteString("\ndags/"); err != nil {
19✔
633
                        return err
×
634
                }
×
635

636
                dagsIgnoreSet = true
19✔
637
        }
638
        err = imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
19✔
639
        if err != nil {
19✔
640
                return err
×
641
        }
×
642

643
        // remove dags from .dockerignore file if we set it
644
        if dagsIgnoreSet {
38✔
645
                err = removeDagsFromDockerIgnore(fullpath)
19✔
646
                if err != nil {
19✔
647
                        return err
×
648
                }
×
649
        }
650

651
        return nil
19✔
652
}
653

654
func buildImage(path, currentVersion, deployImage, imageName, organizationID, buildSecretString string, dagDeployEnabled, isRemoteExecutionEnabled, forceUpgradeToAF3 bool, platformCoreClient astroplatformcore.CoreClient) (version string, err error) {
34✔
655
        imageHandler := airflowImageHandler(deployImage)
34✔
656

34✔
657
        if imageName == "" {
61✔
658
                // Build our image
27✔
659
                fmt.Println(composeImageBuildingPromptMsg)
27✔
660

27✔
661
                if dagDeployEnabled || isRemoteExecutionEnabled {
46✔
662
                        err := buildImageWithoutDags(path, buildSecretString, imageHandler)
19✔
663
                        if err != nil {
19✔
664
                                return "", err
×
665
                        }
×
666
                } else {
8✔
667
                        err := imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
8✔
668
                        if err != nil {
9✔
669
                                return "", err
1✔
670
                        }
1✔
671
                }
672
        } else {
7✔
673
                // skip build if an imageName is passed
7✔
674
                fmt.Println(composeSkipImageBuildingPromptMsg)
7✔
675

7✔
676
                err := imageHandler.TagLocalImage(imageName)
7✔
677
                if err != nil {
7✔
678
                        return "", err
×
679
                }
×
680
        }
681

682
        // parse dockerfile
683
        cmds, err := docker.ParseFile(filepath.Join(path, dockerfile))
33✔
684
        if err != nil {
34✔
685
                return "", errors.Wrapf(err, "failed to parse dockerfile: %s", filepath.Join(path, dockerfile))
1✔
686
        }
1✔
687

688
        DockerfileImage := docker.GetImageFromParsedFile(cmds)
32✔
689

32✔
690
        version, err = imageHandler.GetLabel("", runtimeImageLabel)
32✔
691
        if err != nil {
32✔
692
                fmt.Println("unable get runtime version from image")
×
693
        }
×
694

695
        if config.CFG.ShowWarnings.GetBool() && version == "" {
32✔
696
                fmt.Printf(warningInvalidImageNameMsg, DockerfileImage)
×
697
                fmt.Println("Canceling deploy...")
×
698
                os.Exit(1)
×
699
        }
×
700

701
        resp, err := platformCoreClient.GetDeploymentOptionsWithResponse(httpContext.Background(), organizationID, &astroplatformcore.GetDeploymentOptionsParams{})
32✔
702
        if err != nil {
33✔
703
                return "", err
1✔
704
        }
1✔
705
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
31✔
706
        if err != nil {
31✔
707
                return "", err
×
708
        }
×
709
        deploymentOptionsRuntimeVersions := []string{}
31✔
710
        for _, runtimeRelease := range resp.JSON200.RuntimeReleases {
217✔
711
                deploymentOptionsRuntimeVersions = append(deploymentOptionsRuntimeVersions, runtimeRelease.Version)
186✔
712
        }
186✔
713

714
        if !ValidRuntimeVersion(currentVersion, version, deploymentOptionsRuntimeVersions, forceUpgradeToAF3) {
31✔
715
                fmt.Println("Canceling deploy...")
×
716
                os.Exit(1)
×
717
        }
×
718

719
        WarnIfNonLatestVersion(version, httputil.NewHTTPClient())
31✔
720

31✔
721
        return version, nil
31✔
722
}
723

724
// finalize deploy
725
func finalizeDeploy(deployID, deploymentID, organizationID, dagTarballVersion string, dagDeploy bool, platformCoreClient astroplatformcore.CoreClient) error {
32✔
726
        finalizeDeployRequest := astroplatformcore.FinalizeDeployRequest{}
32✔
727
        if dagDeploy {
54✔
728
                finalizeDeployRequest.DagTarballVersion = &dagTarballVersion
22✔
729
        }
22✔
730
        resp, err := platformCoreClient.FinalizeDeployWithResponse(httpContext.Background(), organizationID, deploymentID, deployID, finalizeDeployRequest)
32✔
731
        if err != nil {
32✔
732
                return err
×
733
        }
×
734
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
32✔
735
        if err != nil {
32✔
736
                return err
×
737
        }
×
738
        if resp.JSON200.DagTarballVersion != nil {
64✔
739
                fmt.Println("Deployed DAG bundle: ", *resp.JSON200.DagTarballVersion)
32✔
740
        }
32✔
741
        if resp.JSON200.ImageTag != "" {
64✔
742
                fmt.Println("Deployed Image Tag: ", resp.JSON200.ImageTag)
32✔
743
        }
32✔
744
        return nil
32✔
745
}
746

747
func createDeploy(organizationID, deploymentID string, request astroplatformcore.CreateDeployRequest, platformCoreClient astroplatformcore.CoreClient) (*astroplatformcore.Deploy, error) {
38✔
748
        resp, err := platformCoreClient.CreateDeployWithResponse(httpContext.Background(), organizationID, deploymentID, request)
38✔
749
        if err != nil {
38✔
750
                return nil, err
×
751
        }
×
752
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
38✔
753
        if err != nil {
38✔
754
                return nil, err
×
755
        }
×
756
        return resp.JSON200, err
38✔
757
}
758

759
func ValidRuntimeVersion(currentVersion, tag string, deploymentOptionsRuntimeVersions []string, forceUpgradeToAF3 bool) bool {
42✔
760
        // Allow old deployments which do not have runtimeVersion tag
42✔
761
        if currentVersion == "" {
43✔
762
                return true
1✔
763
        }
1✔
764

765
        // Check that the tag is not a downgrade
766
        if airflowversions.CompareRuntimeVersions(tag, currentVersion) < 0 {
44✔
767
                fmt.Printf("Cannot deploy a downgraded Astro Runtime version. Modify your Astro Runtime version to %s or higher in your Dockerfile\n", currentVersion)
3✔
768
                return false
3✔
769
        }
3✔
770

771
        // Check that the tag is supported by the deployment
772
        tagInDeploymentOptions := false
38✔
773
        for _, runtimeVersion := range deploymentOptionsRuntimeVersions {
102✔
774
                if airflowversions.CompareRuntimeVersions(tag, runtimeVersion) == 0 {
101✔
775
                        tagInDeploymentOptions = true
37✔
776
                        break
37✔
777
                }
778
        }
779
        if !tagInDeploymentOptions {
39✔
780
                fmt.Println("Cannot deploy an unsupported Astro Runtime version. Modify your Astro Runtime version to a supported version in your Dockerfile")
1✔
781
                fmt.Printf("Supported versions: %s\n", strings.Join(deploymentOptionsRuntimeVersions, ", "))
1✔
782
                return false
1✔
783
        }
1✔
784

785
        // If upgrading from Airflow 2 to Airflow 3, we require at least Runtime 12.0.0 (Airflow 2.10.0) and that the user has forced the upgrade
786
        currentVersionAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(currentVersion)
37✔
787
        tagAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(tag)
37✔
788
        if currentVersionAirflowMajorVersion == "2" && tagAirflowMajorVersion == "3" {
40✔
789
                if airflowversions.CompareRuntimeVersions(currentVersion, "12.0.0") < 0 {
4✔
790
                        fmt.Println("Can only upgrade deployment from Airflow 2 to Airflow 3 with deployment at Astro Runtime 12.0.0 or higher")
1✔
791
                        return false
1✔
792
                }
1✔
793
                if !forceUpgradeToAF3 {
3✔
794
                        fmt.Println("Can only upgrade deployment from Airflow 2 to Airflow 3 with the --force-upgrade-to-af3 flag")
1✔
795
                        return false
1✔
796
                }
1✔
797
        }
798

799
        return true
35✔
800
}
801

802
func WarnIfNonLatestVersion(version string, httpClient *httputil.HTTPClient) {
34✔
803
        client := airflowversions.NewClient(httpClient, false, false)
34✔
804
        latestRuntimeVersion, err := airflowversions.GetDefaultImageTag(client, "", false)
34✔
805
        if err != nil {
36✔
806
                logger.Debugf("unable to get latest runtime version: %s", err)
2✔
807
                return
2✔
808
        }
2✔
809

810
        if airflowversions.CompareRuntimeVersions(version, latestRuntimeVersion) < 0 {
64✔
811
                fmt.Printf("WARNING! You are currently running Astro Runtime Version %s\nConsider upgrading to the latest version, Astro Runtime %s\n", version, latestRuntimeVersion)
32✔
812
        }
32✔
813
}
814

815
// DeployClientImage handles the client deploy functionality
816
func DeployClientImage(deployInput InputClientDeploy) error { //nolint:gocritic
6✔
817
        c, err := config.GetCurrentContext()
6✔
818
        if err != nil {
6✔
NEW
819
                return errors.Wrap(err, "failed to get current context")
×
NEW
820
        }
×
821

822
        fmt.Printf(deploymentHeaderMsg, "Astro")
6✔
823

6✔
824
        // Get the remote client registry endpoint from config
6✔
825
        registryEndpoint := config.CFG.RemoteClientRegistry.GetString()
6✔
826
        if registryEndpoint == "" {
7✔
827
                return errors.New("remote client registry is not configured. Please run 'astro config set remote.client_registry <endpoint>' to configure the registry")
1✔
828
        }
1✔
829

830
        // Use consistent deploy-<timestamp> tagging mechanism like regular deploys
831
        // The ImageName flag only specifies which local image to use, not the remote tag
832
        imageTag := "deploy-" + time.Now().UTC().Format("2006-01-02T15-04")
5✔
833

5✔
834
        // Build the full remote image name
5✔
835
        remoteImage := fmt.Sprintf("%s:%s", registryEndpoint, imageTag)
5✔
836

5✔
837
        // Create an image handler for building and pushing
5✔
838
        imageHandler := airflowImageHandler(remoteImage)
5✔
839

5✔
840
        if deployInput.ImageName != "" {
6✔
841
                // Use the provided local image (tag will be ignored, remote tag is always timestamp-based)
1✔
842
                fmt.Println("Using provided image:", deployInput.ImageName)
1✔
843
                err := imageHandler.TagLocalImage(deployInput.ImageName)
1✔
844
                if err != nil {
1✔
NEW
845
                        return fmt.Errorf("failed to tag local image: %w", err)
×
NEW
846
                }
×
847
        } else {
4✔
848
                // Authenticate with the base image registry before building
4✔
849
                // This is needed because Dockerfile.client uses base images from a private registry
4✔
850
                baseImageRegistry := config.CFG.RemoteBaseImageRegistry.GetString()
4✔
851
                err := airflow.DockerLogin(baseImageRegistry, registryUsername, c.Token)
4✔
852
                if err != nil {
5✔
853
                        return fmt.Errorf("failed to authenticate with registry %s: %w", baseImageRegistry, err)
1✔
854
                }
1✔
855

856
                // Build the client image from the current directory
857
                // Determine target platforms for client deploy
858
                var targetPlatforms []string
3✔
859
                if deployInput.Platform != "" {
3✔
NEW
860
                        // Parse comma-separated platforms from --platform flag
×
NEW
861
                        targetPlatforms = strings.Split(deployInput.Platform, ",")
×
NEW
862
                        // Trim whitespace from each platform
×
NEW
863
                        for i, platform := range targetPlatforms {
×
NEW
864
                                targetPlatforms[i] = strings.TrimSpace(platform)
×
NEW
865
                        }
×
NEW
866
                        fmt.Printf("Building client image for platforms: %s\n", strings.Join(targetPlatforms, ", "))
×
867
                } else {
3✔
868
                        // Use empty slice to let Docker build for host platform by default
3✔
869
                        targetPlatforms = []string{}
3✔
870
                        fmt.Println("Building client image for host platform")
3✔
871
                }
3✔
872

873
                buildConfig := types.ImageBuildConfig{
3✔
874
                        Path:            deployInput.Path,
3✔
875
                        TargetPlatforms: targetPlatforms,
3✔
876
                }
3✔
877

3✔
878
                err = imageHandler.Build("Dockerfile.client", deployInput.BuildSecretString, buildConfig)
3✔
879
                if err != nil {
4✔
880
                        return fmt.Errorf("failed to build client image: %w", err)
1✔
881
                }
1✔
882
        }
883

884
        // Push the image to the remote registry (assumes docker login was done externally)
885
        fmt.Println("Pushing client image to configured remote registry")
3✔
886
        _, err = imageHandler.Push(remoteImage, "", "", false)
3✔
887
        if err != nil {
4✔
888
                return fmt.Errorf("failed to push client image: %w", err)
1✔
889
        }
1✔
890

891
        fmt.Printf("Successfully pushed client image to %s\n", ansi.Bold(remoteImage))
2✔
892
        return nil
2✔
893
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc