• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

astronomer / astro-cli / 6d8926d3-3f19-46b0-96ee-aa659994967c

09 Oct 2025 08:49AM UTC coverage: 38.492% (+0.05%) from 38.44%
6d8926d3-3f19-46b0-96ee-aa659994967c

Pull #1954

circleci

feluelle
Add client deploy support for RE projects

- add `--client` flag to deploy command
- add `--platform` flag to deploy command
Pull Request #1954: Add client deploy support for RE projects

79 of 90 new or added lines in 3 files covered. (87.78%)

54 existing lines in 5 files now uncovered.

24107 of 62628 relevant lines covered (38.49%)

10.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.34
/cloud/deploy/deploy.go
1
package deploy
2

3
import (
4
        "bufio"
5
        "bytes"
6
        httpContext "context"
7
        "fmt"
8
        "os"
9
        "path/filepath"
10
        "strings"
11
        "time"
12

13
        "github.com/astronomer/astro-cli/airflow"
14
        "github.com/astronomer/astro-cli/airflow/types"
15
        airflowversions "github.com/astronomer/astro-cli/airflow_versions"
16
        astrocore "github.com/astronomer/astro-cli/astro-client-core"
17
        astroplatformcore "github.com/astronomer/astro-cli/astro-client-platform-core"
18
        "github.com/astronomer/astro-cli/cloud/deployment"
19
        "github.com/astronomer/astro-cli/cloud/organization"
20
        "github.com/astronomer/astro-cli/config"
21
        "github.com/astronomer/astro-cli/docker"
22
        "github.com/astronomer/astro-cli/pkg/ansi"
23
        "github.com/astronomer/astro-cli/pkg/azure"
24
        "github.com/astronomer/astro-cli/pkg/fileutil"
25
        "github.com/astronomer/astro-cli/pkg/httputil"
26
        "github.com/astronomer/astro-cli/pkg/input"
27
        "github.com/astronomer/astro-cli/pkg/logger"
28
        "github.com/astronomer/astro-cli/pkg/util"
29
        "github.com/pkg/errors"
30
)
31

32
const (
33
        parse                  = "parse"
34
        astroDomain            = "astronomer.io"
35
        registryUsername       = "cli"
36
        runtimeImageLabel      = airflow.RuntimeImageLabel
37
        dagParseAllowedVersion = "4.1.0"
38

39
        composeImageBuildingPromptMsg     = "Building image..."
40
        composeSkipImageBuildingPromptMsg = "Skipping building image..."
41
        deploymentHeaderMsg               = "Authenticated to %s \n\n"
42

43
        warningInvalidImageNameMsg = "WARNING! The image in your Dockerfile '%s' is not based on Astro Runtime and is not supported. Change your Dockerfile with an image that pulls from 'quay.io/astronomer/astro-runtime' to proceed.\n"
44

45
        allTests                 = "all-tests"
46
        parseAndPytest           = "parse-and-all-tests"
47
        enableDagDeployMsg       = "DAG-only deploys are not enabled for this Deployment. Run 'astro deployment update %s --dag-deploy enable' to enable DAG-only deploys"
48
        dagDeployDisabled        = "dag deploy is not enabled for deployment"
49
        invalidWorkspaceID       = "Invalid workspace id %s was provided through the --workspace-id flag\n"
50
        errCiCdEnforcementUpdate = "cannot deploy since ci/cd enforcement is enabled for the deployment %s. Please use API Tokens instead"
51
)
52

53
var (
54
        pytestFile string
55
        dockerfile = "Dockerfile"
56

57
        deployImagePlatformSupport = []string{"linux/amd64"}
58

59
        // Monkey patched to write unit tests
60
        airflowImageHandler  = airflow.ImageHandlerInit
61
        containerHandlerInit = airflow.ContainerHandlerInit
62
        azureUploader        = azure.Upload
63
        canCiCdDeploy        = deployment.CanCiCdDeploy
64
        dagTarballVersion    = ""
65
        dagsUploadURL        = ""
66
        nextTag              = ""
67
)
68

69
var (
70
        errDagsParseFailed = errors.New("your local DAGs did not parse. Fix the listed errors or use `astro deploy [deployment-id] -f` to force deploy") //nolint:revive
71
        envFileMissing     = errors.New("Env file path is incorrect: ")                                                                                  //nolint:revive
72
)
73

74
var (
75
        sleepTime              = 90
76
        dagOnlyDeploySleepTime = 30
77
        tickNum                = 10
78
        timeoutNum             = 180
79
)
80

81
type deploymentInfo struct {
82
        deploymentID             string
83
        namespace                string
84
        deployImage              string
85
        currentVersion           string
86
        organizationID           string
87
        workspaceID              string
88
        webserverURL             string
89
        deploymentType           string
90
        desiredDagTarballVersion string
91
        dagDeployEnabled         bool
92
        cicdEnforcement          bool
93
        name                     string
94
        isRemoteExecutionEnabled bool
95
}
96

97
type InputDeploy struct {
98
        Path              string
99
        RuntimeID         string
100
        WsID              string
101
        Pytest            string
102
        EnvFile           string
103
        ImageName         string
104
        DeploymentName    string
105
        Prompt            bool
106
        Dags              bool
107
        Image             bool
108
        WaitForStatus     bool
109
        DagsPath          string
110
        Description       string
111
        BuildSecretString string
112
        ForceUpgradeToAF3 bool
113
        ClientDeploy      bool
114
        Platform          string
115
}
116

117
const accessYourDeploymentFmt = `
118

119
 Access your Deployment:
120

121
 Deployment View: %s
122
 Airflow UI: %s
123
`
124

125
func removeDagsFromDockerIgnore(fullpath string) error {
58✔
126
        f, err := os.Open(fullpath)
58✔
127
        if err != nil {
58✔
128
                return err
×
129
        }
×
130

131
        defer f.Close()
58✔
132

58✔
133
        var bs []byte
58✔
134
        buf := bytes.NewBuffer(bs)
58✔
135

58✔
136
        scanner := bufio.NewScanner(f)
58✔
137
        for scanner.Scan() {
96✔
138
                text := scanner.Text()
38✔
139
                if text != "dags/" {
57✔
140
                        _, err = buf.WriteString(text + "\n")
19✔
141
                        if err != nil {
19✔
142
                                return err
×
143
                        }
×
144
                }
145
        }
146

147
        if err := scanner.Err(); err != nil {
58✔
148
                return err
×
149
        }
×
150
        err = os.WriteFile(fullpath, bytes.Trim(buf.Bytes(), "\n"), 0o666) //nolint:gosec, mnd
58✔
151
        if err != nil {
58✔
152
                return err
×
153
        }
×
154

155
        return nil
58✔
156
}
157

158
func shouldIncludeMonitoringDag(deploymentType astroplatformcore.DeploymentType) bool {
20✔
159
        return !organization.IsOrgHosted() && !deployment.IsDeploymentDedicated(deploymentType) && !deployment.IsDeploymentStandard(deploymentType)
20✔
160
}
20✔
161

162
func deployDags(path, dagsPath, dagsUploadURL, currentRuntimeVersion string, deploymentType astroplatformcore.DeploymentType) (string, error) {
20✔
163
        if shouldIncludeMonitoringDag(deploymentType) {
36✔
164
                monitoringDagPath := filepath.Join(dagsPath, "astronomer_monitoring_dag.py")
16✔
165

16✔
166
                // Create monitoring dag file
16✔
167
                err := fileutil.WriteStringToFile(monitoringDagPath, airflow.Af2MonitoringDag)
16✔
168
                if err != nil {
16✔
169
                        return "", err
×
170
                }
×
171

172
                // Remove the monitoring dag file after the upload
173
                defer os.Remove(monitoringDagPath)
16✔
174
        }
175

176
        versionID, err := UploadBundle(path, dagsPath, dagsUploadURL, true, currentRuntimeVersion)
20✔
177
        if err != nil {
20✔
178
                return "", err
×
179
        }
×
180

181
        return versionID, nil
20✔
182
}
183

184
// Deploy pushes a new docker image
185
func Deploy(deployInput InputDeploy, platformCoreClient astroplatformcore.CoreClient, coreClient astrocore.CoreClient) error { //nolint
41✔
186
        c, err := config.GetCurrentContext()
41✔
187
        if err != nil {
42✔
188
                return err
1✔
189
        }
1✔
190

191
        if c.Domain == astroDomain {
43✔
192
                fmt.Printf(deploymentHeaderMsg, "Astro")
3✔
193
        } else {
40✔
194
                fmt.Printf(deploymentHeaderMsg, c.Domain)
37✔
195
        }
37✔
196

197
        if deployInput.ClientDeploy {
40✔
NEW
198
                return deployClientImage(deployInput, &c)
×
NEW
199
        }
×
200

201
        deployInfo, err := getDeploymentInfo(deployInput.RuntimeID, deployInput.WsID, deployInput.DeploymentName, deployInput.Prompt, platformCoreClient, coreClient)
40✔
202
        if err != nil {
40✔
203
                return err
×
204
        }
×
205

206
        var dagsPath string
40✔
207
        if deployInput.DagsPath != "" {
55✔
208
                dagsPath = deployInput.DagsPath
15✔
209
        } else {
40✔
210
                dagsPath = filepath.Join(deployInput.Path, "dags")
25✔
211
        }
25✔
212

213
        var dagFiles []string
40✔
214
        if !deployInfo.isRemoteExecutionEnabled {
75✔
215
                dagFiles = fileutil.GetFilesWithSpecificExtension(dagsPath, ".py")
35✔
216
        }
35✔
217

218
        if deployInfo.cicdEnforcement {
41✔
219
                if !canCiCdDeploy(c.Token) {
2✔
220
                        return fmt.Errorf(errCiCdEnforcementUpdate, deployInfo.name) //nolint
1✔
221
                }
1✔
222
        }
223

224
        if deployInput.WsID != deployInfo.workspaceID {
40✔
225
                fmt.Printf(invalidWorkspaceID, deployInput.WsID)
1✔
226
                return nil
1✔
227
        }
1✔
228

229
        if deployInput.Image {
41✔
230
                if !deployInfo.dagDeployEnabled {
3✔
231
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
232
                }
×
233
        }
234

235
        deploymentURL, err := deployment.GetDeploymentURL(deployInfo.deploymentID, deployInfo.workspaceID)
38✔
236
        if err != nil {
38✔
237
                return err
×
238
        }
×
239
        createDeployRequest := astroplatformcore.CreateDeployRequest{
38✔
240
                Description: &deployInput.Description,
38✔
241
        }
38✔
242
        switch {
38✔
243
        case deployInput.Dags:
18✔
244
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeDAGONLY
18✔
245
        case deployInput.Image:
1✔
246
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEONLY
1✔
247
        default:
19✔
248
                createDeployRequest.Type = astroplatformcore.CreateDeployRequestTypeIMAGEANDDAG
19✔
249
        }
250
        deploy, err := createDeploy(deployInfo.organizationID, deployInfo.deploymentID, createDeployRequest, platformCoreClient)
38✔
251
        if err != nil {
38✔
252
                return err
×
253
        }
×
254
        deployID := deploy.Id
38✔
255
        if deploy.DagsUploadUrl != nil {
76✔
256
                dagsUploadURL = *deploy.DagsUploadUrl
38✔
257
        } else {
38✔
258
                dagsUploadURL = ""
×
259
        }
×
260
        if deploy.ImageTag != "" {
38✔
261
                nextTag = deploy.ImageTag
×
262
        } else {
38✔
263
                nextTag = ""
38✔
264
        }
38✔
265

266
        if deployInput.Dags {
56✔
267
                if len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() {
19✔
268
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
1✔
269

1✔
270
                        if !i {
2✔
271
                                fmt.Println("Canceling deploy...")
1✔
272
                                return nil
1✔
273
                        }
1✔
274
                }
275
                if deployInput.Pytest != "" {
29✔
276
                        runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, deployInput.ForceUpgradeToAF3, platformCoreClient)
12✔
277
                        if err != nil {
12✔
278
                                return err
×
279
                        }
×
280

281
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
12✔
282
                        if err != nil {
14✔
283
                                return err
2✔
284
                        }
2✔
285
                }
286

287
                if !deployInfo.dagDeployEnabled {
16✔
288
                        return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
1✔
289
                }
1✔
290

291
                fmt.Println("Initiating DAG deploy for: " + deployInfo.deploymentID)
14✔
292
                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
14✔
293
                if err != nil {
14✔
294
                        if strings.Contains(err.Error(), dagDeployDisabled) {
×
295
                                return fmt.Errorf(enableDagDeployMsg, deployInfo.deploymentID) //nolint
×
296
                        }
×
297

298
                        return err
×
299
                }
300

301
                // finish deploy
302
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
14✔
303
                if err != nil {
14✔
304
                        return err
×
305
                }
×
306

307
                if deployInput.WaitForStatus {
15✔
308
                        // Keeping wait timeout low since dag only deploy is faster
1✔
309
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, dagOnlyDeploySleepTime, tickNum, timeoutNum, platformCoreClient)
1✔
310
                        if err != nil {
2✔
311
                                return err
1✔
312
                        }
1✔
313

314
                        fmt.Println(
×
315
                                "\nSuccessfully uploaded DAGs with version " + ansi.Bold(dagTarballVersion) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful." +
×
316
                                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold(deploymentURL), ansi.Bold(deployInfo.webserverURL)),
×
317
                        )
×
318

×
319
                        return nil
×
320
                }
321

322
                fmt.Println(
13✔
323
                        "\nSuccessfully uploaded DAGs with version " + ansi.Bold(
13✔
324
                                dagTarballVersion,
13✔
325
                        ) + " to Astro. Navigate to the Airflow UI to confirm that your deploy was successful. The Airflow UI takes about 1 minute to update." +
13✔
326
                                fmt.Sprintf(
13✔
327
                                        accessYourDeploymentFmt,
13✔
328
                                        ansi.Bold(deploymentURL),
13✔
329
                                        ansi.Bold(deployInfo.webserverURL),
13✔
330
                                ),
13✔
331
                )
13✔
332
        } else {
20✔
333
                fullpath := filepath.Join(deployInput.Path, ".dockerignore")
20✔
334
                fileExist, _ := fileutil.Exists(fullpath, nil)
20✔
335
                if fileExist {
40✔
336
                        err := removeDagsFromDockerIgnore(fullpath)
20✔
337
                        if err != nil {
20✔
338
                                return errors.Wrap(err, "Found dags entry in .dockerignore file. Remove this entry and try again")
×
339
                        }
×
340
                }
341
                envFileExists, _ := fileutil.Exists(deployInput.EnvFile, nil)
20✔
342
                if !envFileExists && deployInput.EnvFile != ".env" {
21✔
343
                        return fmt.Errorf("%w %s", envFileMissing, deployInput.EnvFile)
1✔
344
                }
1✔
345

346
                if deployInfo.dagDeployEnabled && len(dagFiles) == 0 && config.CFG.ShowWarnings.GetBool() && !deployInput.Image {
19✔
347
                        i, _ := input.Confirm("Warning: No DAGs found. This will delete any existing DAGs. Are you sure you want to deploy?")
×
348

×
349
                        if !i {
×
350
                                fmt.Println("Canceling deploy...")
×
351
                                return nil
×
352
                        }
×
353
                }
354

355
                // Build our image
356
                runtimeVersion, err := buildImage(deployInput.Path, deployInfo.currentVersion, deployInfo.deployImage, deployInput.ImageName, deployInfo.organizationID, deployInput.BuildSecretString, deployInfo.dagDeployEnabled, deployInfo.isRemoteExecutionEnabled, deployInput.ForceUpgradeToAF3, platformCoreClient)
19✔
357
                if err != nil {
19✔
358
                        return err
×
359
                }
×
360

361
                if len(dagFiles) > 0 {
26✔
362
                        err = parseOrPytestDAG(deployInput.Pytest, runtimeVersion, deployInput.EnvFile, deployInfo.deployImage, deployInfo.namespace, deployInput.BuildSecretString)
7✔
363
                        if err != nil {
8✔
364
                                return err
1✔
365
                        }
1✔
366
                } else {
12✔
367
                        fmt.Println("No DAGs found. Skipping testing...")
12✔
368
                }
12✔
369

370
                repository := deploy.ImageRepository
18✔
371
                // TODO: Resolve the edge case where two people push the same nextTag at the same time
18✔
372
                remoteImage := fmt.Sprintf("%s:%s", repository, nextTag)
18✔
373

18✔
374
                imageHandler := airflowImageHandler(deployInfo.deployImage)
18✔
375
                _, err = imageHandler.Push(remoteImage, registryUsername, c.Token, false)
18✔
376
                if err != nil {
18✔
377
                        return err
×
378
                }
×
379

380
                if deployInfo.dagDeployEnabled && len(dagFiles) > 0 {
24✔
381
                        if !deployInput.Image {
12✔
382
                                dagTarballVersion, err = deployDags(deployInput.Path, dagsPath, dagsUploadURL, deployInfo.currentVersion, astroplatformcore.DeploymentType(deployInfo.deploymentType))
6✔
383
                                if err != nil {
6✔
384
                                        return err
×
385
                                }
×
386
                        } else {
×
387
                                fmt.Println("Image Deploy only. Skipping deploying DAG...")
×
388
                        }
×
389
                }
390
                // finish deploy
391
                err = finalizeDeploy(deployID, deployInfo.deploymentID, deployInfo.organizationID, dagTarballVersion, deployInfo.dagDeployEnabled, platformCoreClient)
18✔
392
                if err != nil {
18✔
393
                        return err
×
394
                }
×
395

396
                if deployInput.WaitForStatus {
20✔
397
                        err = deployment.HealthPoll(deployInfo.deploymentID, deployInfo.workspaceID, sleepTime, tickNum, timeoutNum, platformCoreClient)
2✔
398
                        if err != nil {
4✔
399
                                return err
2✔
400
                        }
2✔
401
                }
402

403
                fmt.Println("Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags." +
16✔
404
                        fmt.Sprintf(accessYourDeploymentFmt, ansi.Bold("https://"+deploymentURL), ansi.Bold("https://"+deployInfo.webserverURL)))
16✔
405
        }
406

407
        return nil
29✔
408
}
409

410
func getDeploymentInfo(
411
        deploymentID, wsID, deploymentName string,
412
        prompt bool,
413
        platformCoreClient astroplatformcore.CoreClient,
414
        coreClient astrocore.CoreClient,
415
) (deploymentInfo, error) {
40✔
416
        // Use config deployment if provided
40✔
417
        if deploymentID == "" {
54✔
418
                deploymentID = config.CFG.ProjectDeployment.GetProjectString()
14✔
419
                if deploymentID != "" {
14✔
420
                        fmt.Printf("Deployment ID found in the config file. This Deployment ID will be used for the deploy\n")
×
421
                }
×
422
        }
423

424
        if deploymentID != "" && deploymentName != "" {
48✔
425
                fmt.Printf("Both a Deployment ID and Deployment name have been supplied. The Deployment ID %s will be used for the Deploy\n", deploymentID)
8✔
426
        }
8✔
427

428
        // check if deploymentID or if force prompt was requested was given by user
429
        if deploymentID == "" || prompt {
67✔
430
                currentDeployment, err := deployment.GetDeployment(wsID, deploymentID, deploymentName, false, nil, platformCoreClient, coreClient)
27✔
431
                if err != nil {
27✔
432
                        return deploymentInfo{}, err
×
433
                }
×
434
                coreDeployment, err := deployment.CoreGetDeployment(currentDeployment.OrganizationId, currentDeployment.Id, platformCoreClient)
27✔
435
                if err != nil {
27✔
436
                        return deploymentInfo{}, err
×
437
                }
×
438
                var desiredDagTarballVersion string
27✔
439
                if coreDeployment.DesiredDagTarballVersion != nil {
45✔
440
                        desiredDagTarballVersion = *coreDeployment.DesiredDagTarballVersion
18✔
441
                } else {
27✔
442
                        desiredDagTarballVersion = ""
9✔
443
                }
9✔
444

445
                return deploymentInfo{
27✔
446
                        currentDeployment.Id,
27✔
447
                        currentDeployment.Namespace,
27✔
448
                        airflow.ImageName(currentDeployment.Namespace, "latest"),
27✔
449
                        currentDeployment.RuntimeVersion,
27✔
450
                        currentDeployment.OrganizationId,
27✔
451
                        currentDeployment.WorkspaceId,
27✔
452
                        currentDeployment.WebServerUrl,
27✔
453
                        string(*currentDeployment.Type),
27✔
454
                        desiredDagTarballVersion,
27✔
455
                        currentDeployment.IsDagDeployEnabled,
27✔
456
                        currentDeployment.IsCicdEnforced,
27✔
457
                        currentDeployment.Name,
27✔
458
                        deployment.IsRemoteExecutionEnabled(&currentDeployment),
27✔
459
                }, nil
27✔
460
        }
461
        c, err := config.GetCurrentContext()
13✔
462
        if err != nil {
13✔
463
                return deploymentInfo{}, err
×
464
        }
×
465
        deployInfo, err := getImageName(deploymentID, c.Organization, platformCoreClient)
13✔
466
        if err != nil {
13✔
467
                return deploymentInfo{}, err
×
468
        }
×
469
        deployInfo.deploymentID = deploymentID
13✔
470
        return deployInfo, nil
13✔
471
}
472

473
func parseOrPytestDAG(pytest, runtimeVersion, envFile, deployImage, namespace, buildSecretString string) error {
19✔
474
        validDAGParseVersion := airflowversions.CompareRuntimeVersions(runtimeVersion, dagParseAllowedVersion) >= 0
19✔
475
        if !validDAGParseVersion {
19✔
476
                fmt.Println("\nruntime image is earlier than 4.1.0, this deploy will skip DAG parse...")
×
477
        }
×
478

479
        containerHandler, err := containerHandlerInit(config.WorkingPath, envFile, "Dockerfile", namespace)
19✔
480
        if err != nil {
19✔
481
                return err
×
482
        }
×
483

484
        switch {
19✔
485
        case pytest == parse && validDAGParseVersion:
7✔
486
                // parse dags
7✔
487
                fmt.Println("Testing image...")
7✔
488
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
7✔
489
                if err != nil {
9✔
490
                        return err
2✔
491
                }
2✔
492
        case pytest != "" && pytest != parse && pytest != parseAndPytest:
6✔
493
                // check pytests
6✔
494
                fmt.Println("Testing image...")
6✔
495
                err := checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
496
                if err != nil {
7✔
497
                        return err
1✔
498
                }
1✔
499
        case pytest == parseAndPytest:
6✔
500
                // parse dags and check pytests
6✔
501
                fmt.Println("Testing image...")
6✔
502
                err := parseDAGs(deployImage, buildSecretString, containerHandler)
6✔
503
                if err != nil {
6✔
504
                        return err
×
505
                }
×
506

507
                err = checkPytest(pytest, deployImage, buildSecretString, containerHandler)
6✔
508
                if err != nil {
6✔
509
                        return err
×
510
                }
×
511
        }
512
        return nil
16✔
513
}
514

515
func parseDAGs(deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
13✔
516
        if !config.CFG.SkipParse.GetBool() && !util.CheckEnvBool(os.Getenv("ASTRONOMER_SKIP_PARSE")) {
26✔
517
                err := containerHandler.Parse("", deployImage, buildSecretString)
13✔
518
                if err != nil {
15✔
519
                        fmt.Println(err)
2✔
520
                        return errDagsParseFailed
2✔
521
                }
2✔
522
        } else {
×
523
                fmt.Println("Skipping parsing dags due to skip parse being set to true in either the config.yaml or local environment variables")
×
524
        }
×
525

526
        return nil
11✔
527
}
528

529
// Validate code with pytest
530
func checkPytest(pytest, deployImage, buildSecretString string, containerHandler airflow.ContainerHandler) error {
14✔
531
        if pytest != allTests && pytest != parseAndPytest {
18✔
532
                pytestFile = pytest
4✔
533
        }
4✔
534

535
        exitCode, err := containerHandler.Pytest(pytestFile, "", deployImage, "", buildSecretString)
14✔
536
        if err != nil {
17✔
537
                if strings.Contains(exitCode, "1") { // exit code is 1 meaning tests failed
4✔
538
                        return errors.New("at least 1 pytest in your tests directory failed. Fix the issues listed or rerun the command without the '--pytest' flag to deploy")
1✔
539
                }
1✔
540
                return errors.Wrap(err, "Something went wrong while Pytesting your DAGs,\nif the issue persists rerun the command without the '--pytest' flag to deploy")
2✔
541
        }
542

543
        fmt.Print("\nAll Pytests passed!\n")
11✔
544
        return err
11✔
545
}
546

547
func getImageName(deploymentID, organizationID string, platformCoreClient astroplatformcore.CoreClient) (deploymentInfo, error) {
13✔
548
        resp, err := platformCoreClient.GetDeploymentWithResponse(httpContext.Background(), organizationID, deploymentID)
13✔
549
        if err != nil {
13✔
550
                return deploymentInfo{}, err
×
551
        }
×
552

553
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
13✔
554
        if err != nil {
13✔
555
                return deploymentInfo{}, err
×
556
        }
×
557

558
        currentVersion := resp.JSON200.RuntimeVersion
13✔
559
        namespace := resp.JSON200.Namespace
13✔
560
        workspaceID := resp.JSON200.WorkspaceId
13✔
561
        webserverURL := resp.JSON200.WebServerUrl
13✔
562
        dagDeployEnabled := resp.JSON200.IsDagDeployEnabled
13✔
563
        cicdEnforcement := resp.JSON200.IsCicdEnforced
13✔
564
        isRemoteExecutionEnabled := deployment.IsRemoteExecutionEnabled(resp.JSON200)
13✔
565
        var desiredDagTarballVersion string
13✔
566
        if resp.JSON200.DesiredDagTarballVersion != nil {
18✔
567
                desiredDagTarballVersion = *resp.JSON200.DesiredDagTarballVersion
5✔
568
        } else {
13✔
569
                desiredDagTarballVersion = ""
8✔
570
        }
8✔
571

572
        // We use latest and keep this tag around after deployments to keep subsequent deploys quick
573
        deployImage := airflow.ImageName(namespace, "latest")
13✔
574

13✔
575
        return deploymentInfo{
13✔
576
                namespace:                namespace,
13✔
577
                deployImage:              deployImage,
13✔
578
                currentVersion:           currentVersion,
13✔
579
                organizationID:           organizationID,
13✔
580
                workspaceID:              workspaceID,
13✔
581
                webserverURL:             webserverURL,
13✔
582
                dagDeployEnabled:         dagDeployEnabled,
13✔
583
                desiredDagTarballVersion: desiredDagTarballVersion,
13✔
584
                cicdEnforcement:          cicdEnforcement,
13✔
585
                isRemoteExecutionEnabled: isRemoteExecutionEnabled,
13✔
586
        }, nil
13✔
587
}
588

589
func buildImageWithoutDags(path, buildSecretString string, imageHandler airflow.ImageHandler) error {
19✔
590
        // flag to determine if we are setting the dags folder in dockerignore
19✔
591
        dagsIgnoreSet := false
19✔
592
        // flag to determine if dockerignore file was created on runtime
19✔
593
        dockerIgnoreCreate := false
19✔
594
        fullpath := filepath.Join(path, ".dockerignore")
19✔
595

19✔
596
        defer func() {
38✔
597
                // remove dags from .dockerignore file if we set it
19✔
598
                if dagsIgnoreSet {
38✔
599
                        removeDagsFromDockerIgnore(fullpath) //nolint:errcheck
19✔
600
                }
19✔
601
                // remove created docker ignore file
602
                if dockerIgnoreCreate {
19✔
603
                        os.Remove(fullpath)
×
604
                }
×
605
        }()
606

607
        fileExist, _ := fileutil.Exists(fullpath, nil)
19✔
608
        if !fileExist {
19✔
609
                // Create a dockerignore file and add the dags folder entry
×
610
                err := fileutil.WriteStringToFile(fullpath, "dags/")
×
611
                if err != nil {
×
612
                        return err
×
613
                }
×
614
                dockerIgnoreCreate = true
×
615
        }
616
        lines, err := fileutil.Read(fullpath)
19✔
617
        if err != nil {
19✔
618
                return err
×
619
        }
×
620
        contains, _ := fileutil.Contains(lines, "dags/")
19✔
621
        if !contains {
38✔
622
                f, err := os.OpenFile(fullpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) //nolint:mnd
19✔
623
                if err != nil {
19✔
624
                        return err
×
625
                }
×
626

627
                defer f.Close()
19✔
628

19✔
629
                if _, err := f.WriteString("\ndags/"); err != nil {
19✔
630
                        return err
×
631
                }
×
632

633
                dagsIgnoreSet = true
19✔
634
        }
635
        err = imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
19✔
636
        if err != nil {
19✔
637
                return err
×
638
        }
×
639

640
        // remove dags from .dockerignore file if we set it
641
        if dagsIgnoreSet {
38✔
642
                err = removeDagsFromDockerIgnore(fullpath)
19✔
643
                if err != nil {
19✔
644
                        return err
×
645
                }
×
646
        }
647

648
        return nil
19✔
649
}
650

651
func buildImage(path, currentVersion, deployImage, imageName, organizationID, buildSecretString string, dagDeployEnabled, isRemoteExecutionEnabled, forceUpgradeToAF3 bool, platformCoreClient astroplatformcore.CoreClient) (version string, err error) {
34✔
652
        imageHandler := airflowImageHandler(deployImage)
34✔
653

34✔
654
        if imageName == "" {
61✔
655
                // Build our image
27✔
656
                fmt.Println(composeImageBuildingPromptMsg)
27✔
657

27✔
658
                if dagDeployEnabled || isRemoteExecutionEnabled {
46✔
659
                        err := buildImageWithoutDags(path, buildSecretString, imageHandler)
19✔
660
                        if err != nil {
19✔
661
                                return "", err
×
662
                        }
×
663
                } else {
8✔
664
                        err := imageHandler.Build("", buildSecretString, types.ImageBuildConfig{Path: path, TargetPlatforms: deployImagePlatformSupport})
8✔
665
                        if err != nil {
9✔
666
                                return "", err
1✔
667
                        }
1✔
668
                }
669
        } else {
7✔
670
                // skip build if an imageName is passed
7✔
671
                fmt.Println(composeSkipImageBuildingPromptMsg)
7✔
672

7✔
673
                err := imageHandler.TagLocalImage(imageName)
7✔
674
                if err != nil {
7✔
675
                        return "", err
×
676
                }
×
677
        }
678

679
        // parse dockerfile
680
        cmds, err := docker.ParseFile(filepath.Join(path, dockerfile))
33✔
681
        if err != nil {
34✔
682
                return "", errors.Wrapf(err, "failed to parse dockerfile: %s", filepath.Join(path, dockerfile))
1✔
683
        }
1✔
684

685
        DockerfileImage := docker.GetImageFromParsedFile(cmds)
32✔
686

32✔
687
        version, err = imageHandler.GetLabel("", runtimeImageLabel)
32✔
688
        if err != nil {
32✔
689
                fmt.Println("unable get runtime version from image")
×
690
        }
×
691

692
        if config.CFG.ShowWarnings.GetBool() && version == "" {
32✔
693
                fmt.Printf(warningInvalidImageNameMsg, DockerfileImage)
×
694
                fmt.Println("Canceling deploy...")
×
695
                os.Exit(1)
×
696
        }
×
697

698
        resp, err := platformCoreClient.GetDeploymentOptionsWithResponse(httpContext.Background(), organizationID, &astroplatformcore.GetDeploymentOptionsParams{})
32✔
699
        if err != nil {
33✔
700
                return "", err
1✔
701
        }
1✔
702
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
31✔
703
        if err != nil {
31✔
704
                return "", err
×
705
        }
×
706
        deploymentOptionsRuntimeVersions := []string{}
31✔
707
        for _, runtimeRelease := range resp.JSON200.RuntimeReleases {
217✔
708
                deploymentOptionsRuntimeVersions = append(deploymentOptionsRuntimeVersions, runtimeRelease.Version)
186✔
709
        }
186✔
710

711
        if !ValidRuntimeVersion(currentVersion, version, deploymentOptionsRuntimeVersions, forceUpgradeToAF3) {
31✔
712
                fmt.Println("Canceling deploy...")
×
713
                os.Exit(1)
×
714
        }
×
715

716
        WarnIfNonLatestVersion(version, httputil.NewHTTPClient())
31✔
717

31✔
718
        return version, nil
31✔
719
}
720

721
// finalize deploy
722
func finalizeDeploy(deployID, deploymentID, organizationID, dagTarballVersion string, dagDeploy bool, platformCoreClient astroplatformcore.CoreClient) error {
32✔
723
        finalizeDeployRequest := astroplatformcore.FinalizeDeployRequest{}
32✔
724
        if dagDeploy {
54✔
725
                finalizeDeployRequest.DagTarballVersion = &dagTarballVersion
22✔
726
        }
22✔
727
        resp, err := platformCoreClient.FinalizeDeployWithResponse(httpContext.Background(), organizationID, deploymentID, deployID, finalizeDeployRequest)
32✔
728
        if err != nil {
32✔
729
                return err
×
730
        }
×
731
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
32✔
732
        if err != nil {
32✔
733
                return err
×
734
        }
×
735
        if resp.JSON200.DagTarballVersion != nil {
64✔
736
                fmt.Println("Deployed DAG bundle: ", *resp.JSON200.DagTarballVersion)
32✔
737
        }
32✔
738
        if resp.JSON200.ImageTag != "" {
64✔
739
                fmt.Println("Deployed Image Tag: ", resp.JSON200.ImageTag)
32✔
740
        }
32✔
741
        return nil
32✔
742
}
743

744
func createDeploy(organizationID, deploymentID string, request astroplatformcore.CreateDeployRequest, platformCoreClient astroplatformcore.CoreClient) (*astroplatformcore.Deploy, error) {
38✔
745
        resp, err := platformCoreClient.CreateDeployWithResponse(httpContext.Background(), organizationID, deploymentID, request)
38✔
746
        if err != nil {
38✔
747
                return nil, err
×
748
        }
×
749
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
38✔
750
        if err != nil {
38✔
751
                return nil, err
×
752
        }
×
753
        return resp.JSON200, err
38✔
754
}
755

756
func ValidRuntimeVersion(currentVersion, tag string, deploymentOptionsRuntimeVersions []string, forceUpgradeToAF3 bool) bool {
42✔
757
        // Allow old deployments which do not have runtimeVersion tag
42✔
758
        if currentVersion == "" {
43✔
759
                return true
1✔
760
        }
1✔
761

762
        // Check that the tag is not a downgrade
763
        if airflowversions.CompareRuntimeVersions(tag, currentVersion) < 0 {
44✔
764
                fmt.Printf("Cannot deploy a downgraded Astro Runtime version. Modify your Astro Runtime version to %s or higher in your Dockerfile\n", currentVersion)
3✔
765
                return false
3✔
766
        }
3✔
767

768
        // Check that the tag is supported by the deployment
769
        tagInDeploymentOptions := false
38✔
770
        for _, runtimeVersion := range deploymentOptionsRuntimeVersions {
102✔
771
                if airflowversions.CompareRuntimeVersions(tag, runtimeVersion) == 0 {
101✔
772
                        tagInDeploymentOptions = true
37✔
773
                        break
37✔
774
                }
775
        }
776
        if !tagInDeploymentOptions {
39✔
777
                fmt.Println("Cannot deploy an unsupported Astro Runtime version. Modify your Astro Runtime version to a supported version in your Dockerfile")
1✔
778
                fmt.Printf("Supported versions: %s\n", strings.Join(deploymentOptionsRuntimeVersions, ", "))
1✔
779
                return false
1✔
780
        }
1✔
781

782
        // If upgrading from Airflow 2 to Airflow 3, we require at least Runtime 12.0.0 (Airflow 2.10.0) and that the user has forced the upgrade
783
        currentVersionAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(currentVersion)
37✔
784
        tagAirflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(tag)
37✔
785
        if currentVersionAirflowMajorVersion == "2" && tagAirflowMajorVersion == "3" {
40✔
786
                if airflowversions.CompareRuntimeVersions(currentVersion, "12.0.0") < 0 {
4✔
787
                        fmt.Println("Can only upgrade deployment from Airflow 2 to Airflow 3 with deployment at Astro Runtime 12.0.0 or higher")
1✔
788
                        return false
1✔
789
                }
1✔
790
                if !forceUpgradeToAF3 {
3✔
791
                        fmt.Println("Can only upgrade deployment from Airflow 2 to Airflow 3 with the --force-upgrade-to-af3 flag")
1✔
792
                        return false
1✔
793
                }
1✔
794
        }
795

796
        return true
35✔
797
}
798

799
func WarnIfNonLatestVersion(version string, httpClient *httputil.HTTPClient) {
34✔
800
        client := airflowversions.NewClient(httpClient, false, false)
34✔
801
        latestRuntimeVersion, err := airflowversions.GetDefaultImageTag(client, "", false)
34✔
802
        if err != nil {
36✔
803
                logger.Debugf("unable to get latest runtime version: %s", err)
2✔
804
                return
2✔
805
        }
2✔
806

807
        if airflowversions.CompareRuntimeVersions(version, latestRuntimeVersion) < 0 {
64✔
808
                fmt.Printf("WARNING! You are currently running Astro Runtime Version %s\nConsider upgrading to the latest version, Astro Runtime %s\n", version, latestRuntimeVersion)
32✔
809
        }
32✔
810
}
811

812
// deployClientImage handles the client deploy functionality
813
func deployClientImage(deployInput InputDeploy, c *config.Context) error { //nolint:gocritic
6✔
814
        fmt.Println("Deploying client image...")
6✔
815

6✔
816
        // Get the remote client registry endpoint from config
6✔
817
        registryEndpoint := config.CFG.RemoteClientRegistry.GetString()
6✔
818
        if registryEndpoint == "" {
7✔
819
                return errors.New("remote client registry is not configured. Please run 'astro config set remote.client_registry <endpoint>' to configure the registry")
1✔
820
        }
1✔
821

822
        // Use consistent deploy-<timestamp> tagging mechanism like regular deploys
823
        // The ImageName flag only specifies which local image to use, not the remote tag
824
        imageTag := "deploy-" + time.Now().UTC().Format("2006-01-02T15-04")
5✔
825

5✔
826
        // Build the full remote image name
5✔
827
        remoteImage := fmt.Sprintf("%s:%s", registryEndpoint, imageTag)
5✔
828

5✔
829
        // Create an image handler for building and pushing
5✔
830
        imageHandler := airflowImageHandler(remoteImage)
5✔
831

5✔
832
        if deployInput.ImageName != "" {
6✔
833
                // Use the provided local image (tag will be ignored, remote tag is always timestamp-based)
1✔
834
                fmt.Println("Using provided image:", deployInput.ImageName)
1✔
835
                err := imageHandler.TagLocalImage(deployInput.ImageName)
1✔
836
                if err != nil {
1✔
NEW
837
                        return fmt.Errorf("failed to tag local image: %w", err)
×
NEW
838
                }
×
839
        } else {
4✔
840
                // Authenticate with the base image registry before building
4✔
841
                // This is needed because Dockerfile.client uses base images from a private registry
4✔
842
                baseImageRegistry := config.CFG.RemoteBaseImageRegistry.GetString()
4✔
843
                err := airflow.DockerLogin(baseImageRegistry, registryUsername, c.Token)
4✔
844
                if err != nil {
5✔
845
                        return fmt.Errorf("failed to authenticate with registry %s: %w", baseImageRegistry, err)
1✔
846
                }
1✔
847

848
                // Build the client image from the current directory
849
                fmt.Println("Building client image...")
3✔
850

3✔
851
                // Determine target platforms for client deploy
3✔
852
                var targetPlatforms []string
3✔
853
                if deployInput.Platform != "" {
3✔
NEW
854
                        // Parse comma-separated platforms from --platform flag
×
NEW
855
                        targetPlatforms = strings.Split(deployInput.Platform, ",")
×
NEW
856
                        // Trim whitespace from each platform
×
NEW
857
                        for i, platform := range targetPlatforms {
×
NEW
858
                                targetPlatforms[i] = strings.TrimSpace(platform)
×
NEW
859
                        }
×
NEW
860
                        fmt.Printf("Building for platforms: %s\n", strings.Join(targetPlatforms, ", "))
×
861
                } else {
3✔
862
                        // Use empty slice to let Docker build for host platform by default
3✔
863
                        targetPlatforms = []string{}
3✔
864
                        fmt.Println("Building for host platform")
3✔
865
                }
3✔
866

867
                buildConfig := types.ImageBuildConfig{
3✔
868
                        Path:            deployInput.Path,
3✔
869
                        TargetPlatforms: targetPlatforms,
3✔
870
                }
3✔
871

3✔
872
                err = imageHandler.Build("Dockerfile.client", deployInput.BuildSecretString, buildConfig)
3✔
873
                if err != nil {
4✔
874
                        return fmt.Errorf("failed to build client image: %w", err)
1✔
875
                }
1✔
876
        }
877

878
        // Push the image to the remote registry
879
        fmt.Printf("Pushing client image to %s...\n", remoteImage)
3✔
880
        _, err := imageHandler.Push(remoteImage, "", "", false)
3✔
881
        if err != nil {
4✔
882
                return fmt.Errorf("failed to push client image: %w", err)
1✔
883
        }
1✔
884

885
        fmt.Printf("✓ Successfully pushed client image: %s\n", ansi.Bold(remoteImage))
2✔
886
        return nil
2✔
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc