• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

astronomer / astro-cli / fc632fcc-6f64-4bcc-a917-49c509c77432

28 Nov 2025 01:32AM UTC coverage: 39.41% (+6.3%) from 33.132%
fc632fcc-6f64-4bcc-a917-49c509c77432

Pull #1988

circleci

Simpcyclassy
Add Houston API 1.1.0 support - remove desiredRuntimeVersion and AC fields

- Add version 1.1.0 queries for DeploymentGetRequest
- Add version 1.1.0 queries for DeploymentsGetRequest
- Add version 1.1.0 queries for PaginatedDeploymentsGetRequest
- Add version 1.1.0 mutations for DeploymentCreateRequest
- Add version 1.1.0 mutations for DeploymentUpdateRequest
- Remove airflowVersion, desiredAirflowVersion, desiredRuntimeVersion fields
- Use runtimeVersion and runtimeAirflowVersion instead

Houston API 1.1.0 no longer returns desired version and AC fields.
All version information is now managed through runtimeVersion.
Pull Request #1988: Add Houston API 1.1.0 support - remove desiredRuntimeVersion and AC fields

23684 of 60096 relevant lines covered (39.41%)

11.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.7
/cloud/deploy/deploy.go
1
package deploy
2

3
import (
4
        "bufio"
5
        "bytes"
6
        httpContext "context"
7
        "fmt"
8
        "os"
9
        "path/filepath"
10
        "strings"
11

12
        "github.com/astronomer/astro-cli/airflow"
13
        "github.com/astronomer/astro-cli/airflow/types"
14
        airflowversions "github.com/astronomer/astro-cli/airflow_versions"
15
        astrocore "github.com/astronomer/astro-cli/astro-client-core"
16
        astroplatformcore "github.com/astronomer/astro-cli/astro-client-platform-core"
17
        "github.com/astronomer/astro-cli/cloud/deployment"
18
        "github.com/astronomer/astro-cli/cloud/organization"
19
        "github.com/astronomer/astro-cli/config"
20
        "github.com/astronomer/astro-cli/docker"
21
        "github.com/astronomer/astro-cli/pkg/ansi"
22
        "github.com/astronomer/astro-cli/pkg/azure"
23
        "github.com/astronomer/astro-cli/pkg/fileutil"
24
        "github.com/astronomer/astro-cli/pkg/httputil"
25
        "github.com/astronomer/astro-cli/pkg/input"
26
        "github.com/astronomer/astro-cli/pkg/logger"
27
        "github.com/astronomer/astro-cli/pkg/util"
28
        "github.com/pkg/errors"
29
)
30

31
const (
32
        parse                  = "parse"
33
        astroDomain            = "astronomer.io"
34
        registryUsername       = "cli"
35
        runtimeImageLabel      = airflow.RuntimeImageLabel
36
        dagParseAllowedVersion = "4.1.0"
37

38
        composeImageBuildingPromptMsg     = "Building image..."
39
        composeSkipImageBuildingPromptMsg = "Skipping building image..."
40
        deploymentHeaderMsg               = "Authenticated to %s \n\n"
41

42
        warningInvalidImageNameMsg = "WARNING! The image in your Dockerfile '%s' is not based on Astro Runtime and is not supported. Change your Dockerfile with an image that pulls from 'quay.io/astronomer/astro-runtime' to proceed.\n"
43

44
        allTests                 = "all-tests"
45
        parseAndPytest           = "parse-and-all-tests"
46
        enableDagDeployMsg       = "DAG-only deploys are not enabled for this Deployment. Run 'astro deployment update %s --dag-deploy enable' to enable DAG-only deploys"
47
        dagDeployDisabled        = "dag deploy is not enabled for deployment"
48
        invalidWorkspaceID       = "Invalid workspace id %s was provided through the --workspace-id flag\n"
49
        errCiCdEnforcementUpdate = "cannot deploy since ci/cd enforcement is enabled for the deployment %s. Please use API Tokens instead"
50
)
51

52
var (
53
        pytestFile string
54
        dockerfile = "Dockerfile"
55

56
        deployImagePlatformSupport = []string{"linux/amd64"}
57

58
        // Monkey patched to write unit tests
59
        airflowImageHandler  = airflow.ImageHandlerInit
60
        containerHandlerInit = airflow.ContainerHandlerInit
61
        azureUploader        = azure.Upload
62
        canCiCdDeploy        = deployment.CanCiCdDeploy
63
        dagTarballVersion    = ""
64
        dagsUploadURL        = ""
65
        nextTag              = ""
66
)
67

68
var (
69
        errDagsParseFailed = errors.New("your local DAGs did not parse. Fix the listed errors or use `astro deploy [deployment-id] -f` to force deploy") //nolint:revive
70
        envFileMissing     = errors.New("Env file path is incorrect: ")                                                                                  //nolint:revive
71
)
72

73
var (
74
        sleepTime              = 90
75
        dagOnlyDeploySleepTime = 30
76
        tickNum                = 10
77
        timeoutNum             = 180
78
)
79

80
type deploymentInfo struct {
81
        deploymentID             string
82
        namespace                string
83
        deployImage              string
84
        currentVersion           string
85
        organizationID           string
86
        workspaceID              string
87
        webserverURL             string
88
        deploymentType           string
89
        desiredDagTarballVersion string
90
        dagDeployEnabled         bool
91
        cicdEnforcement          bool
92
        name                     string
93
        isRemoteExecutionEnabled bool
94
}
95

96
type InputDeploy struct {
97
        Path              string
98
        RuntimeID         string
99
        WsID              string
100
        Pytest            string
101
        EnvFile           string
102
        ImageName         string
103
        DeploymentName    string
104
        Prompt            bool
105
        Dags              bool
106
        Image             bool
107
        WaitForStatus     bool
108
        DagsPath          string
109
        Description       string
110
        BuildSecretString string
111
        ForceUpgradeToAF3 bool
112
}
113

114
const accessYourDeploymentFmt = `
115

116
 Access your Deployment:
117

118
 Deployment View: %s
119
 Airflow UI: %s
120
`
121

122
func removeDagsFromDockerIgnore(fullpath string) error {
58✔
123
        f, err := os.Open(fullpath)
58✔
124
        if err != nil {
58✔
125
                return err
×
126
        }
×
127

128
        defer f.Close()
58✔
129

58✔
130
        var bs []byte
58✔
131
        buf := bytes.NewBuffer(bs)
58✔
132

58✔
133
        scanner := bufio.NewScanner(f)
58✔
134
        for scanner.Scan() {
96✔
135
                text := scanner.Text()
38✔
136
                if text != "dags/" {
57✔
137
                        _, err = buf.WriteString(text + "\n")
19✔
138
                        if err != nil {
19✔
139
                                return err
×
140
                        }
×
141
                }
142
        }
143

144
        if err := scanner.Err(); err != nil {
58✔
145
                return err
×
146
        }
×
147
        err = os.WriteFile(fullpath, bytes.Trim(buf.Bytes(), "\n"), 0o666) //nolint:gosec, mnd
58✔
148
        if err != nil {
58✔
149
                return err
×
150
        }
×
151

152
        return nil
58✔
153
}
154

155
func shouldIncludeMonitoringDag(deploymentType astroplatformcore.DeploymentType) bool {
20✔
156
        return !organization.IsOrgHosted() && !deployment.IsDeploymentDedicated(deploymentType) && !deployment.IsDeploymentStandard(deploymentType)
20✔
157
}
20✔
158

159
func deployDags(path, dagsPath, dagsUploadURL, currentRuntimeVersion string, deploymentType astroplatformcore.DeploymentType) (string, error) {
20✔
160
        if shouldIncludeMonitoringDag(deploymentType) {
36✔
161
                monitoringDagPath := filepath.Join(dagsPath, "astronomer_monitoring_dag.py")
16✔
162

16✔
163
                // Create monitoring dag file
16✔
164
                err := fileutil.WriteStringToFile(monitoringDagPath, airflow.Af2MonitoringDag)
16✔
165
                if err != nil {
16✔
166
                        return "", err
×
167
                }
×
168

169
                // Remove the monitoring dag file after the upload
170
                defer os.Remove(monitoringDagPath)
16✔
171
        }
172

173
        versionID, err := UploadBundle(path, dagsPath, dagsUploadURL, true, currentRuntimeVersion)
20✔
174
        if err != nil {
20✔
175
                return "", err
×
176
        }
×
177

178
        return versionID, nil
20✔
179
}
180

181
// Deploy pushes a new docker image
182
func Deploy(deployInput InputDeploy, platformCoreClient astroplatformcore.CoreClient, coreClient astrocore.CoreClient) error { //nolint
41✔
183
        c, err := config.GetCurrentContext()
41✔
184
        if err != nil {
42✔
185
                return err
1✔
186
        }
1✔
187

188
        if c.Domain == astroDomain {
43✔
189
                fmt.Printf(deploymentHeaderMsg, "Astro")
3✔
190
        } else {
40✔
191
                fmt.Printf(deploymentHeaderMsg, c.Domain)
37✔
192
        }
37✔
193

194
        deployInfo, err := getDeploymentInfo(deployInput.RuntimeID, deployInput.WsID, deployInput.DeploymentName, deployInput.Prompt, platformCoreClient, coreClient)
40✔
195
        if err != nil {
40✔
196
                return err
×
197
        }
×
198

199
        var dagsPath string
40✔
200
        if deployInput.DagsPath != "" {
55✔
201
                dagsPath = deployInput.DagsPath
15✔
202
        } else {
40✔
203
                dagsPath = filepath.Join(deployInput.Path, "dags")
25✔
204
        }
25✔
205

206
        var dagFiles []string
40✔
207
        if !deployInfo.isRemoteExecutionEnabled {
75✔
208
                dagFiles = fileutil.GetFilesWithSpecificExtension(dagsPath, ".py")
35✔
209
        }
35✔
210

211
        if deployInfo.cicdEnforcement {
41✔
212
                if !canCiCdDeploy(c.Token) {
2✔
213
                        return fmt.Errorf(errCiCdEnforcementUpdate, deployInfo.name) //nolint
1✔
214
                }
1✔
215
        }
216

217
        if deployInput.WsID != deployInfo.workspaceID {
40✔
218
                fmt.Printf(invalidWorkspaceID, deployInput.WsID)
1✔
219
                return nil
1✔
220
        }
1✔
221

222
        if deployInput.Image {
41✔
223
                if !deployInfo.dagDeployEnabled {
3✔
224
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
225
                }
×
226
        }
227

228
        deploymentURL, err := deployment.GetDeploymentURL(deployInfo.deploymentID, deployInfo.workspaceID)
38✔
229
        if err != nil {
38✔
230
                return err
×
231
        }
×
232
        createDeployRequest := astroplatformcore.CreateDeployRequest{
38✔
233
                Description: &deployInput.Description,
38✔
234
        }
38✔
235
        switch {
38✔
236
        case deployInput.Dags:
18✔
237
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeDAGONLY
18✔
238
        case deployInput.Image:
1✔
239
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEONLY
1✔
240
        default:
19✔
241
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEANDDAG
19✔
242
        }
243
        deploy, err := createDeploy(deployInfo.organizationID, deployInfo.deploymentID, createDeployRequest, platformCoreClient)
38✔
244
        if err != nil {
38✔
245
                return err
×
246
        }
×
247
        deployID := deploy.Id
38✔
248
        if deploy.DagsUploadUrl != nil {
76✔
249
                dagsUploadURL = *deploy.DagsUploadUrl
38✔
250
        } else {
38✔
251
                dagsUploadURL = ""
×
252
        }
×
253
        if deploy.ImageTag != "" {
38✔
254
                nextTag = deploy.ImageTag
×
255
        } else {
38✔
256
                nextTag = ""
38✔
257
        }
38✔
258

259
        if deployInput.Dags {
56✔
260
                if len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() {
19✔
261
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
1✔
262

1✔
263
                        if !i {
2✔
264
                                fmt.Println("Canceling deploy...")
1✔
265
                                return nil
1✔
266
                        }
1✔
267
                }
268
                if deployInput.Pytest != "" {
29✔
269
                        runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, deployInput.ForceUpgradeToAF3, platformCoreClient)
12✔
270
                        if err != nil {
12✔
271
                                return err
×
272
                        }
×
273

274
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
12✔
275
                        if err != nil {
14✔
276
                                return err
2✔
277
                        }
2✔
278
                }
279

280
                if !deployInfo.dagDeployEnabled {
16✔
281
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
1✔
282
                }
1✔
283

284
                fmt.Println("Initiating DAG deploy for: " + deployInfo.deploymentID)
14✔
285
                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
14✔
286
                if err != nil {
14✔
287
                        if strings.Contains(err.Error(), dagDeployDisabled) {
×
288
                                return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
289
                        }
×
290

291
                        return err
×
292
                }
293

294
                // finish deploy
295
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
14✔
296
                if err != nil {
14✔
297
                        return err
×
298
                }
×
299

300
                if deployInput.WaitForStatus {
15✔
301
                        // Keeping wait timeout low since dag only deploy is faster
1✔
302
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, dagOnlyDeploySleepTime, tickNum, timeoutNum, platformCoreClient)
1✔
303
                        if err != nil {
2✔
304
                                return err
1✔
305
                        }
1✔
306

307
                        fmt.Println(
×
308
                                "\nSuccessfully uploaded DAGs with version " + ansi.Bold(dagTarballVersion) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful." +
×
309
                                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold(deploymentURL), ansi.Bold(deployInfo.webserverURL)),
×
310
                        )
×
311

×
312
                        return nil
×
313
                }
314

315
                fmt.Println(
13✔
316
                        "\nSuccessfully uploaded DAGs with version " + ansi.Bold(
13✔
317
                                dagTarballVersion,
13✔
318
                        ) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful. The Airflow UI takes about 1 minute to update." +
13✔
319
                                fmt.Sprintf(
13✔
320
                                        accessYourDeploymentFmt,
13✔
321
                                        ansi.Bold(deploymentURL),
13✔
322
                                        ansi.Bold(deployInfo.webserverURL),
13✔
323
                                ),
13✔
324
                )
13✔
325
        } else {
20✔
326
                fullpath := filepath.Join(deployInput.Path, ".dockerignore")
20✔
327
                fileExist, _ := fileutil.Exists(fullpath, nil)
20✔
328
                if fileExist {
40✔
329
                        err := removeDagsFromDockerIgnore(fullpath)
20✔
330
                        if err != nil {
20✔
331
                                return errors.Wrap(err, "Found dags entry in .dockerignore file. Remove this entry and try again")
×
332
                        }
×
333
                }
334
                envFileExists, _ := fileutil.Exists(deployInput.EnvFile, nil)
20✔
335
                if !envFileExists && deployInput.EnvFile != ".env" {
21✔
336
                        return fmt.Errorf("%w %s", envFileMissing, deployInput.EnvFile)
1✔
337
                }
1✔
338

339
                if deployInfo.dagDeployEnabled && len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() && !deployInput.Image {
19✔
340
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
×
341

×
342
                        if !i {
×
343
                                fmt.Println("Canceling deploy...")
×
344
                                return nil
×
345
                        }
×
346
                }
347

348
                // Build our image
349
                runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, deployInput.ForceUpgradeToAF3, platformCoreClient)
19✔
350
                if err != nil {
19✔
351
                        return err
×
352
                }
×
353

354
                if len(dagFiles) > 0 {
26✔
355
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
7✔
356
                        if err != nil {
8✔
357
                                return err
1✔
358
                        }
1✔
359
                } else {
12✔
360
                        fmt.Println("No DAGs found. Skipping testing...")
12✔
361
                }
12✔
362

363
                repository := deploy.ImageRepository
18✔
364
                // TODO: Resolve the edge case where two people push the same nextTag at the same time
18✔
365
                remoteImage := fmt.Sprintf("%s:%s", repository, nextTag)
18✔
366

18✔
367
                imageHandler := airflowImageHandler(deployInfo.deployImage)
18✔
368
                _, err = imageHandler.Push(remoteImage, registryUsername, c.Token, false)
18✔
369
                if err != nil {
18✔
370
                        return err
×
371
                }
×
372

373
                if deployInfo.dagDeployEnabled && len(dagFiles) > 0 {
24✔
374
                        if !deployInput.Image {
12✔
375
                                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
6✔
376
                                if err != nil {
6✔
377
                                        return err
×
378
                                }
×
379
                        } else {
×
380
                                fmt.Println("Image Deploy only. Skipping deploying DAG...")
×
381
                        }
×
382
                }
383
                // finish deploy
384
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
18✔
385
                if err != nil {
18✔
386
                        return err
×
387
                }
×
388

389
                if deployInput.WaitForStatus {
20✔
390
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, sleepTime, tickNum, timeoutNum, platformCoreClient)
2✔
391
                        if err != nil {
4✔
392
                                return err
2✔
393
                        }
2✔
394
                }
395

396
                fmt.Println("Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags." +
16✔
397
                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold("https://"+deploymentURL), ansi.Bold("https://"+deployInfo.webserverURL)))
16✔
398
        }
399

400
        return nil
29✔
401
}
402

403
func getDeploymentInfo(
404
        deploymentID, wsID, deploymentName string,
405
        prompt bool,
406
        platformCoreClient astroplatformcore.CoreClient,
407
        coreClient astrocore.CoreClient,
408
) (deploymentInfo, error) {
40✔
409
        // Use config deployment if provided
40✔
410
        if deploymentID == "" {
54✔
411
                deploymentID = config.CFG.ProjectDeployment.GetProjectString()
14✔
412
                if deploymentID != "" {
14✔
413
                        fmt.Printf("Deployment ID found in the config file. This Deployment ID will be used for the deploy\n")
×
414
                }
×
415
        }
416

417
        if deploymentID != "" && deploymentName != "" {
48✔
418
                fmt.Printf("Both a Deployment ID and Deployment name have been supplied. The Deployment ID %s will be used for the Deploy\n", deploymentID)
8✔
419
        }
8✔
420

421
        // check if deploymentID or if force prompt was requested was given by user
422
        if deploymentID == "" || prompt {
67✔
423
                currentDeployment, err := deployment.GetDeployment(wsID, deploymentID, deploymentName, false, nil, platformCoreClient, coreClient)
27✔
424
                if err != nil {
27✔
425
                        return deploymentInfo{}, err
×
426
                }
×
427
                coreDeployment, err := deployment.CoreGetDeployment(currentDeployment.OrganizationId, currentDeployment.Id, platformCoreClient)
27✔
428
                if err != nil {
27✔
429
                        return deploymentInfo{}, err
×
430
                }
×
431
                var desiredDagTarballVersion string
27✔
432
                if coreDeployment.DesiredDagTarballVersion != nil {
45✔
433
                        desiredDagTarballVersion = *coreDeployment.DesiredDagTarballVersion
18✔
434
                } else {
27✔
435
                        desiredDagTarballVersion = ""
9✔
436
                }
9✔
437

438
                return deploymentInfo{
27✔
439
                        currentDeployment.Id,
27✔
440
                        currentDeployment.Namespace,
27✔
441
                        airflow.ImageName(currentDeployment.Namespace, "latest"),
27✔
442
                        currentDeployment.RuntimeVersion,
27✔
443
                        currentDeployment.OrganizationId,
27✔
444
                        currentDeployment.WorkspaceId,
27✔
445
                        currentDeployment.WebServerUrl,
27✔
446
                        string(*currentDeployment.Type),
27✔
447
                        desiredDagTarballVersion,
27✔
448
                        currentDeployment.IsDagDeployEnabled,
27✔
449
                        currentDeployment.IsCicdEnforced,
27✔
450
                        currentDeployment.Name,
27✔
451
                        deployment.IsRemoteExecutionEnabled(&currentDeployment),
27✔
452
                }, nil
27✔
453
        }
454
        c, err := config.GetCurrentContext()
13✔
455
        if err != nil {
13✔
456
                return deploymentInfo{}, err
×
457
        }
×
458
        deployInfo, err := getImageName(deploymentID, c.Organization, platformCoreClient)
13✔
459
        if err != nil {
13✔
460
                return deploymentInfo{}, err
×
461
        }
×
462
        deployInfo.deploymentID = deploymentID
13✔
463
        return deployInfo, nil
13✔
464
}
465

466
func parseOrPytestDAG(pytest, runtimeVersion, envFile, deployImage, namespace, buildSecretString string) error {
19✔
467
        validDAGParseVersion := airflowversions.CompareRuntimeVersions(runtimeVersion, dagParseAllowedVersion) >= 0
19✔
468
        if !validDAGParseVersion {
19✔
469
                fmt.Println("\nruntime image is earlier than 4.1.0, this deploy will skip DAG parse...")
×
470
        }
×
471

472
        containerHandler, err := containerHandlerInit(config.WorkingPath, envFile, "Dockerfile", namespace)
19✔
473
        if err != nil {
19✔
474
                return err
×
475
        }
×
476

477
        switch {
19✔
478
        case pytest == parse && validDAGParseVersion:
7✔
479
                // parse dags
7✔
480
                fmt.Println("Testing image...")
7✔
481
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
7✔
482
                if err != nil {
9✔
483
                        return err
2✔
484
                }
2✔
485
        case pytest != "" && pytest != parse && pytest != parseAndPytest:
6✔
486
                // check pytests
6✔
487
                fmt.Println("Testing image...")
6✔
488
                err := checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
489
                if err != nil {
7✔
490
                        return err
1✔
491
                }
1✔
492
        case pytest == parseAndPytest:
6✔
493
                // parse dags and check pytests
6✔
494
                fmt.Println("Testing image...")
6✔
495
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
6✔
496
                if err != nil {
6✔
497
                        return err
×
498
                }
×
499

500
                err = checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
501
                if err != nil {
6✔
502
                        return err
×
503
                }
×
504
        }
505
        return nil
16✔
506
}
507

508
func parseDAGs(deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
13✔
509
        if !config.CFG.SkipParse.GetBool() && !util.CheckEnvBool(os.Getenv("ASTRONOMER_SKIP_PARSE")) {
26✔
510
                err := containerHandler.Parse("", deployImage, buildSecretString)
13✔
511
                if err != nil {
15✔
512
                        fmt.Println(err)
2✔
513
                        return errDagsParseFailed
2✔
514
                }
2✔
515
        } else {
×
516
                fmt.Println("Skipping parsing dags due to skip parse being set to true in either the config.yaml or local environment variables")
×
517
        }
×
518

519
        return nil
11✔
520
}
521

522
// Validate code with pytest
523
func checkPytest(pytest, deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
14✔
524
        if pytest != allTests && pytest != parseAndPytest {
18✔
525
                pytestFile = pytest
4✔
526
        }
4✔
527

528
        exitCode, err := containerHandler.Pytest(pytestFile, "", deployImage, "", buildSecretString)
14✔
529
        if err != nil {
17✔
530
                if strings.Contains(exitCode, "1") { // exit code is 1 meaning tests failed
4✔
531
                        return errors.New("at least 1 pytest in your tests directory failed. Fix the issues listed or rerun the command without the '--pytest' flag to deploy")
1✔
532
                }
1✔
533
                return errors.Wrap(err, "Something went wrong while Pytesting your DAGs,\nif the issue persists rerun the command without the '--pytest' flag to deploy")
2✔
534
        }
535

536
        fmt.Print("\nAll Pytests passed!\n")
11✔
537
        return err
11✔
538
}
539

540
func getImageName(deploymentID, organizationID string, platformCoreClient astroplatformcore.CoreClient) (deploymentInfo, error) {
13✔
541
        resp, err := platformCoreClient.GetDeploymentWithResponse(httpContext.Background(), organizationID, deploymentID)
13✔
542
        if err != nil {
13✔
543
                return deploymentInfo{}, err
×
544
        }
×
545

546
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
13✔
547
        if err != nil {
13✔
548
                return deploymentInfo{}, err
×
549
        }
×
550

551
        currentVersion := resp.JSON200.RuntimeVersion
13✔
552
        namespace := resp.JSON200.Namespace
13✔
553
        workspaceID := resp.JSON200.WorkspaceId
13✔
554
        webserverURL := resp.JSON200.WebServerUrl
13✔
555
        dagDeployEnabled := resp.JSON200.IsDagDeployEnabled
13✔
556
        cicdEnforcement := resp.JSON200.IsCicdEnforced
13✔
557
        isRemoteExecutionEnabled := deployment.IsRemoteExecutionEnabled(resp.JSON200)
13✔
558
        var desiredDagTarballVersion string
13✔
559
        if resp.JSON200.DesiredDagTarballVersion != nil {
18✔
560
                desiredDagTarballVersion = *resp.JSON200.DesiredDagTarballVersion
5✔
561
        } else {
13✔
562
                desiredDagTarballVersion = ""
8✔
563
        }
8✔
564

565
        // We use latest and keep this tag around after deployments to keep subsequent deploys quick
566
        deployImage := airflow.ImageName(namespace, "latest")
13✔
567

13✔
568
        return deploymentInfo{
13✔
569
                namespace:                namespace,
13✔
570
                deployImage:              deployImage,
13✔
571
                currentVersion:           currentVersion,
13✔
572
                organizationID:           organizationID,
13✔
573
                workspaceID:              workspaceID,
13✔
574
                webserverURL:             webserverURL,
13✔
575
                dagDeployEnabled:         dagDeployEnabled,
13✔
576
                desiredDagTarballVersion: desiredDagTarballVersion,
13✔
577
                cicdEnforcement:          cicdEnforcement,
13✔
578
                isRemoteExecutionEnabled: isRemoteExecutionEnabled,
13✔
579
        }, nil
13✔
580
}
581

582
func buildImageWithoutDags(path, buildSecretString string, imageHandler airflow.ImageHandler) error {
19✔
583
        // flag to determine if we are setting the dags folder in dockerignore
19✔
584
        dagsIgnoreSet := false
19✔
585
        // flag to determine if dockerignore file was created on runtime
19✔
586
        dockerIgnoreCreate := false
19✔
587
        fullpath := filepath.Join(path, ".dockerignore")
19✔
588

19✔
589
        defer func() {
38✔
590
                // remove dags from .dockerignore file if we set it
19✔
591
                if dagsIgnoreSet {
38✔
592
                        removeDagsFromDockerIgnore(fullpath) //nolint:errcheck
19✔
593
                }
19✔
594
                // remove created docker ignore file
595
                if dockerIgnoreCreate {
19✔
596
                        os.Remove(fullpath)
×
597
                }
×
598
        }()
599

600
        fileExist, _ := fileutil.Exists(fullpath, nil)
19✔
601
        if !fileExist {
19✔
602
                // Create a dockerignore file and add the dags folder entry
×
603
                err := fileutil.WriteStringToFile(fullpath, "dags/")
×
604
                if err != nil {
×
605
                        return err
×
606
                }
×
607
                dockerIgnoreCreate = true
×
608
        }
609
        lines, err := fileutil.Read(fullpath)
19✔
610
        if err != nil {
19✔
611
                return err
×
612
        }
×
613
        contains, _ := fileutil.Contains(lines, "dags/")
19✔
614
        if !contains {
38✔
615
                f, err := os.OpenFile(fullpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) //nolint:mnd
19✔
616
                if err != nil {
19✔
617
                        return err
×
618
                }
×
619

620
                defer f.Close()
19✔
621

19✔
622
                if _, err := f.WriteString("\ndags/"); err != nil {
19✔
623
                        return err
×
624
                }
×
625

626
                dagsIgnoreSet = true
19✔
627
        }
628
        err = imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
19✔
629
        if err != nil {
19✔
630
                return err
×
631
        }
×
632

633
        // remove dags from .dockerignore file if we set it
634
        if dagsIgnoreSet {
38✔
635
                err = removeDagsFromDockerIgnore(fullpath)
19✔
636
                if err != nil {
19✔
637
                        return err
×
638
                }
×
639
        }
640

641
        return nil
19✔
642
}
643

644
func buildImage(path, currentVersion, deployImage, imageName, organizationID, buildSecretString string, dagDeployEnabled, isRemoteExecutionEnabled, forceUpgradeToAF3 bool, platformCoreClient astroplatformcore.CoreClient) (version string, err error) {
34✔
645
        imageHandler := airflowImageHandler(deployImage)
34✔
646

34✔
647
        if imageName == "" {
61✔
648
                // Build our image
27✔
649
                fmt.Println(composeImageBuildingPromptMsg)
27✔
650

27✔
651
                if dagDeployEnabled || isRemoteExecutionEnabled {
46✔
652
                        err := buildImageWithoutDags(path, buildSecretString, imageHandler)
19✔
653
                        if err != nil {
19✔
654
                                return "", err
×
655
                        }
×
656
                } else {
8✔
657
                        err := imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
8✔
658
                        if err != nil {
9✔
659
                                return "", err
1✔
660
                        }
1✔
661
                }
662
        } else {
7✔
663
                // skip build if an imageName is passed
7✔
664
                fmt.Println(composeSkipImageBuildingPromptMsg)
7✔
665

7✔
666
                err := imageHandler.TagLocalImage(imageName)
7✔
667
                if err != nil {
7✔
668
                        return "", err
×
669
                }
×
670
        }
671

672
        // parse dockerfile
673
        cmds, err := docker.ParseFile(filepath.Join(path, dockerfile))
33✔
674
        if err != nil {
34✔
675
                return "", errors.Wrapf(err, "failed to parse dockerfile: %s", filepath.Join(path, dockerfile))
1✔
676
        }
1✔
677

678
        DockerfileImage := docker.GetImageFromParsedFile(cmds)
32✔
679

32✔
680
        version, err = imageHandler.GetLabel("", runtimeImageLabel)
32✔
681
        if err != nil {
32✔
682
                fmt.Println("unable get runtime version from image")
×
683
        }
×
684

685
        if config.CFG.ShowWarnings.GetBool() && version == "" {
32✔
686
                fmt.Printf(warningInvalidImageNameMsg, DockerfileImage)
×
687
                fmt.Println("Canceling deploy...")
×
688
                os.Exit(1)
×
689
        }
×
690

691
        resp, err := platformCoreClient.GetDeploymentOptionsWithResponse(httpContext.Background(), organizationID, &astroplatformcore.GetDeploymentOptionsParams{})
32✔
692
        if err != nil {
33✔
693
                return "", err
1✔
694
        }
1✔
695
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
31✔
696
        if err != nil {
31✔
697
                return "", err
×
698
        }
×
699
        deploymentOptionsRuntimeVersions := []string{}
31✔
700
        for _, runtimeRelease := range resp.JSON200.RuntimeReleases {
217✔
701
                deploymentOptionsRuntimeVersions = append(deploymentOptionsRuntimeVersions, runtimeRelease.Version)
186✔
702
        }
186✔
703

704
        if !ValidRuntimeVersion(currentVersion, version, deploymentOptionsRuntimeVersions, forceUpgradeToAF3) {
31✔
705
                fmt.Println("Canceling deploy...")
×
706
                os.Exit(1)
×
707
        }
×
708

709
        WarnIfNonLatestVersion(version, httputil.NewHTTPClient())
31✔
710

31✔
711
        return version, nil
31✔
712
}
713

714
// finalize deploy
715
func finalizeDeploy(deployID, deploymentID, organizationID, dagTarballVersion string, dagDeploy bool, platformCoreClient astroplatformcore.CoreClient) error {
32✔
716
        finalizeDeployRequest := astroplatformcore.FinalizeDeployRequest{}
32✔
717
        if dagDeploy {
54✔
718
                finalizeDeployRequest.DagTarballVersion = &dagTarballVersion
22✔
719
        }
22✔
720
        resp, err := platformCoreClient.FinalizeDeployWithResponse(httpContext.Background(), organizationID, deploymentID, deployID, finalizeDeployRequest)
32✔
721
        if err != nil {
32✔
722
                return err
×
723
        }
×
724
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
32✔
725
        if err != nil {
32✔
726
                return err
×
727
        }
×
728
        if resp.JSON200.DagTarballVersion != nil {
64✔
729
                fmt.Println("Deployed DAG bundle: ", *resp.JSON200.DagTarballVersion)
32✔
730
        }
32✔
731
        if resp.JSON200.ImageTag != "" {
64✔
732
                fmt.Println("Deployed Image Tag: ", resp.JSON200.ImageTag)
32✔
733
        }
32✔
734
        return nil
32✔
735
}
736

737
func createDeploy(organizationID, deploymentID string, request astroplatformcore.CreateDeployRequest, platformCoreClient astroplatformcore.CoreClient) (*astroplatformcore.Deploy, error) {
38✔
738
        resp, err := platformCoreClient.CreateDeployWithResponse(httpContext.Background(), organizationID, deploymentID, request)
38✔
739
        if err != nil {
38✔
740
                return nil, err
×
741
        }
×
742
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
38✔
743
        if err != nil {
38✔
744
                return nil, err
×
745
        }
×
746
        return resp.JSON200, err
38✔
747
}
748

749
func ValidRuntimeVersion(currentVersion, tag string, deploymentOptionsRuntimeVersions []string, forceUpgradeToAF3 bool) bool {
42✔
750
        // Allow old deployments which do not have runtimeVersion tag
42✔
751
        if currentVersion == "" {
43✔
752
                return true
1✔
753
        }
1✔
754

755
        // Check that the tag is not a downgrade
756
        if airflowversions.CompareRuntimeVersions(tag, currentVersion) < 0 {
44✔
757
                fmt.Printf("Cannot deploy a downgraded Astro Runtime version. Modify your Astro Runtime version to %s or higher in your Dockerfile\n", currentVersion)
3✔
758
                return false
3✔
759
        }
3✔
760

761
        // Check that the tag is supported by the deployment
762
        tagInDeploymentOptions := false
38✔
763
        for _, runtimeVersion := range deploymentOptionsRuntimeVersions {
102✔
764
                if airflowversions.CompareRuntimeVersions(tag, runtimeVersion) == 0 {
101✔
765
                        tagInDeploymentOptions = true
37✔
766
                        break
37✔
767
                }
768
        }
769
        if !tagInDeploymentOptions {
39✔
770
                fmt.Println("Cannot deploy an unsupported Astro Runtime version. Modify your Astro Runtime version to a supported version in your Dockerfile")
1✔
771
                fmt.Printf("Supported versions: %s\n", strings.Join(deploymentOptionsRuntimeVersions, ", "))
1✔
772
                return false
1✔
773
        }
1✔
774

775
        // If upgrading from Airflow 2 to Airflow 3, we require at least Runtime 12.0.0 (Airflow 2.10.0) and that the user has forced the upgrade
776
        currentVersionAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(currentVersion)
37✔
777
        tagAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(tag)
37✔
778
        if currentVersionAirflowMajorVersion == "2" && tagAirflowMajorVersion == "3" {
40✔
779
                if airflowversions.CompareRuntimeVersions(currentVersion, "12.0.0") < 0 {
4✔
780
                        fmt.Println("Can only upgrade deployment from Airflow 2 to Airflow 3 with deployment at Astro Runtime 12.0.0 or higher")
1✔
781
                        return false
1✔
782
                }
1✔
783
                if !forceUpgradeToAF3 {
3✔
784
                        fmt.Println("Can only upgrade deployment from Airflow 2 to Airflow 3 with the --force-upgrade-to-af3 flag")
1✔
785
                        return false
1✔
786
                }
1✔
787
        }
788

789
        return true
35✔
790
}
791

792
func WarnIfNonLatestVersion(version string, httpClient *httputil.HTTPClient) {
34✔
793
        client := airflowversions.NewClient(httpClient, false)
34✔
794
        latestRuntimeVersion, err := airflowversions.GetDefaultImageTag(client, "", false)
34✔
795
        if err != nil {
36✔
796
                logger.Debugf("unable to get latest runtime version: %s", err)
2✔
797
                return
2✔
798
        }
2✔
799

800
        if airflowversions.CompareRuntimeVersions(version, latestRuntimeVersion) < 0 {
64✔
801
                fmt.Printf("WARNING! You are currently running Astro Runtime Version %s\nConsider upgrading to the latest version, Astro Runtime %s\n", version, latestRuntimeVersion)
32✔
802
        }
32✔
803
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc