• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

astronomer / astro-cli / 06dfac41-e870-4621-b93a-ad7f46b90b80

21 Oct 2025 06:42AM UTC coverage: 38.506% (-0.008%) from 38.514%
06dfac41-e870-4621-b93a-ad7f46b90b80

Pull #1964

circleci

neel-astro
Add wait time flag instead of hardcoded timeouts
Pull Request #1964: Add wait time flag instead of hardcoded timeouts

25 of 35 new or added lines in 7 files covered. (71.43%)

2 existing lines in 1 file now uncovered.

24132 of 62670 relevant lines covered (38.51%)

10.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.98
/cloud/deploy/bundle.go
1
package deploy
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "os"
8
        "path/filepath"
9
        "strings"
10
        "time"
11

12
        airflowversions "github.com/astronomer/astro-cli/airflow_versions"
13
        astrocore "github.com/astronomer/astro-cli/astro-client-core"
14
        astroplatformcore "github.com/astronomer/astro-cli/astro-client-platform-core"
15
        "github.com/astronomer/astro-cli/cloud/deployment"
16
        "github.com/astronomer/astro-cli/config"
17
        "github.com/astronomer/astro-cli/pkg/fileutil"
18
        "github.com/astronomer/astro-cli/pkg/git"
19
        "github.com/astronomer/astro-cli/pkg/logger"
20
)
21

22
type DeployBundleInput struct {
23
        BundlePath         string
24
        MountPath          string
25
        DeploymentID       string
26
        BundleType         string
27
        Description        string
28
        Wait               bool
29
        WaitTime           time.Duration
30
        PlatformCoreClient astroplatformcore.CoreClient
31
        CoreClient         astrocore.CoreClient
32
}
33

34
func DeployBundle(input *DeployBundleInput) error {
6✔
35
        c, err := config.GetCurrentContext()
6✔
36
        if err != nil {
6✔
37
                return err
×
38
        }
×
39

40
        // get the current deployment so we can check the deploy is valid
41
        currentDeployment, err := deployment.CoreGetDeployment(c.Organization, input.DeploymentID, input.PlatformCoreClient)
6✔
42
        if err != nil {
6✔
43
                return err
×
44
        }
×
45

46
        // if CI/CD is enforced, check the subject can deploy
47
        if currentDeployment.IsCicdEnforced && !canCiCdDeploy(c.Token) {
7✔
48
                return fmt.Errorf(errCiCdEnforcementUpdate, currentDeployment.Name)
1✔
49
        }
1✔
50

51
        // check the deployment is enabled for DAG deploys
52
        if !currentDeployment.IsDagDeployEnabled {
6✔
53
                return fmt.Errorf(enableDagDeployMsg, input.DeploymentID)
1✔
54
        }
1✔
55

56
        // retrieve metadata about the local Git checkout. returns nil if not available
57
        deployGit, commitMessage := retrieveLocalGitMetadata(input.BundlePath)
4✔
58

4✔
59
        // if no description was provided, use the commit message from the local Git checkout
4✔
60
        if input.Description == "" {
7✔
61
                input.Description = commitMessage
3✔
62
        }
3✔
63

64
        // initialize the deploy
65
        deploy, err := createBundleDeploy(c.Organization, input, deployGit, input.CoreClient)
4✔
66
        if err != nil {
4✔
67
                return err
×
68
        }
×
69

70
        // check we received an upload URL
71
        if deploy.BundleUploadUrl == nil {
5✔
72
                return errors.New("no bundle upload URL received from Astro")
1✔
73
        }
1✔
74

75
        // upload the bundle
76
        tarballVersion, err := UploadBundle(config.WorkingPath, input.BundlePath, *deploy.BundleUploadUrl, false, currentDeployment.RuntimeVersion)
3✔
77
        if err != nil {
3✔
78
                return err
×
79
        }
×
80

81
        // finalize the deploy
82
        err = finalizeBundleDeploy(c.Organization, input.DeploymentID, deploy.Id, tarballVersion, input.CoreClient)
3✔
83
        if err != nil {
3✔
84
                return err
×
85
        }
×
86
        fmt.Println("Successfully uploaded bundle with version " + tarballVersion + " to Astro.")
3✔
87

3✔
88
        // if requested, wait for the deploy to finish by polling the deployment until it is healthy
3✔
89
        if input.Wait {
3✔
NEW
90
                err = deployment.HealthPoll(currentDeployment.Id, currentDeployment.WorkspaceId, dagOnlyDeploySleepTime, tickNum, int(input.WaitTime.Seconds()), input.PlatformCoreClient)
×
91
                if err != nil {
×
92
                        return err
×
93
                }
×
94
        }
95

96
        return nil
3✔
97
}
98

99
type DeleteBundleInput struct {
100
        MountPath          string
101
        DeploymentID       string
102
        WorkspaceID        string
103
        BundleType         string
104
        Description        string
105
        Wait               bool
106
        WaitTime           time.Duration
107
        CoreClient         astrocore.CoreClient
108
        PlatformCoreClient astroplatformcore.CoreClient
109
}
110

111
func DeleteBundle(input *DeleteBundleInput) error {
1✔
112
        c, err := config.GetCurrentContext()
1✔
113
        if err != nil {
1✔
114
                return err
×
115
        }
×
116

117
        // initialize the deploy
118
        createInput := &DeployBundleInput{
1✔
119
                MountPath:    input.MountPath,
1✔
120
                DeploymentID: input.DeploymentID,
1✔
121
                BundleType:   input.BundleType,
1✔
122
                Description:  input.Description,
1✔
123
        }
1✔
124
        deploy, err := createBundleDeploy(c.Organization, createInput, nil, input.CoreClient)
1✔
125
        if err != nil {
1✔
126
                return err
×
127
        }
×
128

129
        // immediately finalize with no version, which will delete the bundle from the deployment
130
        err = finalizeBundleDeploy(c.Organization, input.DeploymentID, deploy.Id, "", input.CoreClient)
1✔
131
        if err != nil {
1✔
132
                return err
×
133
        }
×
134
        fmt.Println("Successfully requested bundle delete for mount path " + input.MountPath + " from Astro.")
1✔
135

1✔
136
        // if requested, wait for the deploy to finish by polling the deployment until it is healthy
1✔
137
        if input.Wait {
1✔
NEW
138
                err = deployment.HealthPoll(input.DeploymentID, input.WorkspaceID, dagOnlyDeploySleepTime, tickNum, int(input.WaitTime.Seconds()), input.PlatformCoreClient)
×
139
                if err != nil {
×
140
                        return err
×
141
                }
×
142
        }
143

144
        return nil
1✔
145
}
146

147
// ValidateBundleSymlinks checks if any symlinks within the bundlePath point outside of it
148
func ValidateBundleSymlinks(bundlePath string) error {
8✔
149
        absBundlePath, err := filepath.Abs(bundlePath)
8✔
150
        if err != nil {
8✔
151
                return fmt.Errorf("failed to get absolute path for bundle directory: %w", err)
×
152
        }
×
153

154
        err = filepath.WalkDir(bundlePath, func(path string, d os.DirEntry, err error) error {
30✔
155
                if err != nil {
22✔
156
                        return err // Propagate errors from WalkDir itself
×
157
                }
×
158

159
                // Check only for symlinks
160
                if d.Type()&os.ModeSymlink != 0 {
29✔
161
                        target, err := os.Readlink(path)
7✔
162
                        if err != nil {
7✔
163
                                logger.Debugf("Could not read symlink %s: %v", path, err)
×
164
                                return nil
×
165
                        }
×
166

167
                        // If the target is not absolute, join it with the directory containing the link
168
                        if !filepath.IsAbs(target) {
13✔
169
                                target = filepath.Join(filepath.Dir(path), target)
6✔
170
                        }
6✔
171

172
                        // Get the absolute path of the target
173
                        absTarget, err := filepath.Abs(target)
7✔
174
                        if err != nil {
7✔
175
                                logger.Debugf("Could not get absolute path for symlink target %s -> %s: %v", path, target, err)
×
176
                                return nil
×
177
                        }
×
178

179
                        // Check if the absolute target path is outside the absolute bundle path directory
180
                        if !strings.HasPrefix(absTarget, absBundlePath) {
10✔
181
                                return fmt.Errorf("symlink %s points to %s which is outside the bundle directory %s", path, target, absBundlePath)
3✔
182
                        }
3✔
183
                }
184
                return nil
19✔
185
        })
186
        if err != nil {
11✔
187
                return fmt.Errorf("bundle validation failed: %w", err)
3✔
188
        }
3✔
189

190
        return nil
5✔
191
}
192

193
func UploadBundle(tarDirPath, bundlePath, uploadURL string, prependBaseDir bool, currentRuntimeVersion string) (string, error) {
23✔
194
        // If Airflow 3.x, check for symlinks pointing outside the bundle directory
23✔
195
        if airflowversions.AirflowMajorVersionForRuntimeVersion(currentRuntimeVersion) == "3" {
23✔
196
                err := ValidateBundleSymlinks(bundlePath)
×
197
                if err != nil {
×
198
                        return "", err
×
199
                }
×
200
        }
201

202
        tarFilePath := filepath.Join(tarDirPath, "bundle.tar")
23✔
203
        tarGzFilePath := tarFilePath + ".gz"
23✔
204
        defer func() {
46✔
205
                tarFiles := []string{tarFilePath, tarGzFilePath}
23✔
206
                for _, file := range tarFiles {
69✔
207
                        err := os.Remove(file)
46✔
208
                        if err != nil {
46✔
209
                                if os.IsNotExist(err) {
×
210
                                        continue
×
211
                                }
212
                                fmt.Println("\nFailed to delete archived file: ", err.Error())
×
213
                                fmt.Println("\nPlease delete the archived file manually from path: " + file)
×
214
                        }
215
                }
216
        }()
217

218
        // Generate the bundle tar
219
        err := fileutil.Tar(bundlePath, tarFilePath, prependBaseDir, []string{".git/"})
23✔
220
        if err != nil {
23✔
221
                return "", err
×
222
        }
×
223

224
        // Gzip the tar
225
        err = fileutil.GzipFile(tarFilePath, tarGzFilePath)
23✔
226
        if err != nil {
23✔
227
                return "", err
×
228
        }
×
229

230
        tarGzFile, err := os.Open(tarGzFilePath)
23✔
231
        if err != nil {
23✔
232
                return "", err
×
233
        }
×
234
        defer tarGzFile.Close()
23✔
235

23✔
236
        versionID, err := azureUploader(uploadURL, tarGzFile)
23✔
237
        if err != nil {
23✔
238
                return "", err
×
239
        }
×
240

241
        return versionID, nil
23✔
242
}
243

244
func createBundleDeploy(organizationID string, input *DeployBundleInput, deployGit *astrocore.DeployGit, coreClient astrocore.CoreClient) (*astrocore.Deploy, error) {
5✔
245
        request := astrocore.CreateDeployRequest{
5✔
246
                Description:     &input.Description,
5✔
247
                Type:            astrocore.CreateDeployRequestTypeBUNDLE,
5✔
248
                BundleMountPath: &input.MountPath,
5✔
249
                BundleType:      &input.BundleType,
5✔
250
        }
5✔
251
        if deployGit != nil {
7✔
252
                request.Git = &astrocore.CreateDeployGitRequest{
2✔
253
                        Provider:   astrocore.CreateDeployGitRequestProvider(deployGit.Provider),
2✔
254
                        Repo:       deployGit.Repo,
2✔
255
                        Account:    deployGit.Account,
2✔
256
                        Path:       deployGit.Path,
2✔
257
                        Branch:     deployGit.Branch,
2✔
258
                        CommitSha:  deployGit.CommitSha,
2✔
259
                        CommitUrl:  fmt.Sprintf("https://github.com/%s/%s/commit/%s", deployGit.Account, deployGit.Repo, deployGit.CommitSha),
2✔
260
                        AuthorName: deployGit.AuthorName,
2✔
261
                }
2✔
262
        }
2✔
263
        resp, err := coreClient.CreateDeployWithResponse(context.Background(), organizationID, input.DeploymentID, request)
5✔
264
        if err != nil {
5✔
265
                return nil, err
×
266
        }
×
267
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
5✔
268
        if err != nil {
5✔
269
                return nil, err
×
270
        }
×
271
        return resp.JSON200, nil
5✔
272
}
273

274
func finalizeBundleDeploy(organizationID, deploymentID, deployID, tarballVersion string, coreClient astrocore.CoreClient) error {
4✔
275
        request := astrocore.UpdateDeployRequest{
4✔
276
                BundleTarballVersion: &tarballVersion,
4✔
277
        }
4✔
278
        resp, err := coreClient.UpdateDeployWithResponse(context.Background(), organizationID, deploymentID, deployID, request)
4✔
279
        if err != nil {
4✔
280
                return err
×
281
        }
×
282
        err = astrocore.NormalizeAPIError(resp.HTTPResponse, resp.Body)
4✔
283
        if err != nil {
4✔
284
                return err
×
285
        }
×
286
        return nil
4✔
287
}
288

289
func retrieveLocalGitMetadata(bundlePath string) (deployGit *astrocore.DeployGit, commitMessage string) {
4✔
290
        if git.HasUncommittedChanges(bundlePath) {
5✔
291
                fmt.Println("Local repository has uncommitted changes, skipping Git metadata retrieval")
1✔
292
                return nil, ""
1✔
293
        }
1✔
294

295
        deployGit = &astrocore.DeployGit{}
3✔
296

3✔
297
        // get the remote repository details, assume the remote is named "origin"
3✔
298
        repoURL, err := git.GetRemoteRepository(bundlePath, "origin")
3✔
299
        if err != nil {
4✔
300
                logger.Debugf("Failed to retrieve remote repository details, skipping Git metadata retrieval: %s", err)
1✔
301
                return nil, ""
1✔
302
        }
1✔
303
        switch repoURL.Host {
2✔
304
        case "github.com":
2✔
305
                deployGit.Provider = astrocore.DeployGitProviderGITHUB
2✔
306
        default:
×
307
                logger.Debugf("Unsupported Git provider, skipping Git metadata retrieval: %s", repoURL.Host)
×
308
                return nil, ""
×
309
        }
310
        urlPath := strings.TrimPrefix(repoURL.Path, "/")
2✔
311
        firstSlashIndex := strings.Index(urlPath, "/")
2✔
312
        if firstSlashIndex == -1 {
2✔
313
                logger.Debugf("Failed to parse remote repository path, skipping Git metadata retrieval: %s", repoURL.Path)
×
314
                return nil, ""
×
315
        }
×
316
        deployGit.Account = urlPath[:firstSlashIndex]
2✔
317
        deployGit.Repo = urlPath[firstSlashIndex+1:]
2✔
318

2✔
319
        // get the path of the bundle within the repository
2✔
320
        path, err := git.GetLocalRepositoryPathPrefix(bundlePath, bundlePath)
2✔
321
        if err != nil {
2✔
322
                logger.Debugf("Failed to retrieve local repository path prefix, skipping Git metadata retrieval: %s", err)
×
323
                return nil, ""
×
324
        }
×
325
        if path != "" {
3✔
326
                deployGit.Path = &path
1✔
327
        }
1✔
328

329
        // get the branch of the local commit
330
        branch, err := git.GetBranch(bundlePath)
2✔
331
        if err != nil {
2✔
332
                logger.Debugf("Failed to retrieve branch name, skipping Git metadata retrieval: %s", err)
×
333
                return nil, ""
×
334
        }
×
335
        deployGit.Branch = branch
2✔
336

2✔
337
        // get the local commit
2✔
338
        sha, message, authorName, _, err := git.GetHeadCommit(bundlePath)
2✔
339
        if err != nil {
2✔
340
                logger.Debugf("Failed to retrieve commit, skipping Git metadata retrieval: %s", err)
×
341
                return nil, ""
×
342
        }
×
343
        deployGit.CommitSha = sha
2✔
344
        if authorName != "" {
4✔
345
                deployGit.AuthorName = &authorName
2✔
346
        }
2✔
347

348
        // derive the remote URL of the local commit
349
        switch repoURL.Host {
2✔
350
        case "github.com":
2✔
351
                deployGit.CommitUrl = fmt.Sprintf("https://%s/%s/%s/commit/%s", repoURL.Host, deployGit.Account, deployGit.Repo, sha)
2✔
352
        default:
×
353
                logger.Debugf("Unsupported Git provider, skipping Git metadata retrieval: %s", repoURL.Host)
×
354
                return nil, ""
×
355
        }
356

357
        logger.Debugf("Retrieved Git metadata: %+v", deployGit)
2✔
358

2✔
359
        return deployGit, message
2✔
360
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc