• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 13989782125

21 Mar 2025 10:32AM UTC coverage: 75.671% (+0.05%) from 75.619%
13989782125

push

github

web-flow
[8.x](backport #3090) cnvm: Delete snapshots after scanning them (#3127)

### Summary of your changes
Fixes various underlying issues with CNVM snapshot deletion. The logic here is to do a best-effort attempt to clean up snapshots created during the run both continuously (after we are done scanning the snapshot) and on shutdown. Cleaning old snapshots that we don't use anymore is part of https://github.com/elastic/cloudbeat/issues/3105. Issues fixed:
- `internal/flavors/vulnerability.go`: Wait for `Run()` to finish, this ensures that final snapshot clean-up is done after execution finishes
- `internal/resources/providers/awslib/ec2/provider.go`: Give extra retries to snapshot deletion, mainly avoiding "too many requests" errors
- `internal/vulnerability/snapshot.go`: New snapshot manager to handle creation, deletion and clean-up of snapshots. The deletion extends the `context.Context` with an extra 30s timeout to give a grace period to clean-up snapshots during shutdown/restart.
- `internal/vulnerability/replicator.go`: Add dependency to the snapshot manager instead of `provider` to track created snapshots
- `internal/vulnerability/scanner.go`: Delete snapshot after scanning
- `internal/vulnerability/worker.go`: `defer` a call snapshot manager's cleanup

### Screenshot/Data
1. The way I verified we avoid leftover snapshots is to change the name of the snapshots:
  ```diff
  diff --git a/internal/resources/providers/awslib/ec2/provider.go b/internal/resources/providers/awslib/ec2/provider.go
  index 14abc5bf..3faeef7d 100644
  --- a/internal/resources/providers/awslib/ec2/provider.go
  +++ b/internal/resources/providers/awslib/ec2/provider.go
  @@ -78,7 +78,7 @@ func (p *Provider) CreateSnapshots(ctx context.Context, ins *Ec2Instance) ([]EBS
 			  {
 				  ResourceType: "snapshot",
 				  Tags: []types.Tag{
  -					{Key: aws.String("Name"), Value: aws.String(fmt.Sprintf("elastic-vulnerability-%s", *ins.InstanceId))},
  +					{Key: aws.String("Name"), V... (continued)

151 of 171 new or added lines in 10 files covered. (88.3%)

1 existing line in 1 file now uncovered.

8989 of 11879 relevant lines covered (75.67%)

16.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.05
/internal/vulnerability/worker.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package vulnerability
19

20
import (
21
        "context"
22
        "fmt"
23
        "sync"
24
        "time"
25

26
        "github.com/elastic/beats/v7/libbeat/beat"
27

28
        "github.com/elastic/cloudbeat/internal/config"
29
        "github.com/elastic/cloudbeat/internal/dataprovider"
30
        "github.com/elastic/cloudbeat/internal/infra/clog"
31
        "github.com/elastic/cloudbeat/internal/resources/providers/awslib"
32
        "github.com/elastic/cloudbeat/internal/resources/providers/awslib/ec2"
33
)
34

35
type VulnerabilityWorker struct {
36
        log           *clog.Logger
37
        fetcher       VulnerabilityFetcher
38
        replicator    VulnerabilityReplicator
39
        verifier      VulnerabilityVerifier
40
        evaluator     VulnerabilityScanner
41
        runner        VulnerabilityRunner
42
        eventsCreator EventsCreator
43
        manager       *SnapshotManager
44
}
45

46
func NewVulnerabilityWorker(ctx context.Context, log *clog.Logger, c *config.Config, bdp dataprovider.CommonDataProvider, cdp dataprovider.ElasticCommonDataProvider) (*VulnerabilityWorker, error) {
1✔
47
        log.Debug("VulnerabilityWorker: New")
1✔
48
        awsConfig, err := awslib.InitializeAWSConfig(c.CloudConfig.Aws.Cred)
1✔
49
        if err != nil {
1✔
50
                return nil, fmt.Errorf("VulnerabilityWorker: failed to initialize AWS credentials: %w", err)
×
51
        }
×
52

53
        provider := ec2.NewCurrentRegionEC2Provider(ctx, log, "", *awsConfig, &awslib.MultiRegionClientFactory[ec2.Client]{})
1✔
54
        manager := NewSnapshotManager(log, provider)
1✔
55
        fetcher := NewVulnerabilityFetcher(log, provider)
1✔
56
        replicator := NewVulnerabilityReplicator(log, manager)
1✔
57
        verifier := NewVulnerabilityVerifier(log, provider)
1✔
58
        runner, err := NewVulnerabilityRunner(ctx, log)
1✔
59
        if err != nil {
1✔
60
                return nil, fmt.Errorf("VulnerabilityWorker: could not get init NewVulnerabilityRunner: %w", err)
×
61
        }
×
62
        // TODO: Replace sequence with more generic approach
63
        evaluator, err := NewVulnerabilityScanner(log, runner, manager, c, time.Now())
1✔
64
        if err != nil {
1✔
65
                return nil, fmt.Errorf("VulnerabilityWorker: could not get init NewVulnerabilityScanner: %w", err)
×
66
        }
×
67

68
        eventsCreator := NewEventsCreator(log, c, bdp, cdp)
1✔
69

1✔
70
        return &VulnerabilityWorker{
1✔
71
                log:           log,
1✔
72
                fetcher:       fetcher,
1✔
73
                replicator:    replicator,
1✔
74
                verifier:      verifier,
1✔
75
                evaluator:     evaluator,
1✔
76
                runner:        runner,
1✔
77
                eventsCreator: eventsCreator,
1✔
78
                manager:       manager,
1✔
79
        }, nil
1✔
80
}
81

82
func (f *VulnerabilityWorker) Run(ctx context.Context) {
1✔
83
        f.log.Info("Starting VulnerabilityWorker.work")
1✔
84
        defer func() {
2✔
85
                if err := f.runner.Close(ctx); err != nil {
1✔
86
                        f.log.Warnf("error during runner closing %s", err.Error())
×
87
                }
×
88
        }()
89

90
        if ctx.Err() != nil {
1✔
91
                f.log.Info("VulnerabilityWorker.work context canceled")
×
92
                return
×
NEW
93
        }
×
94

95
        defer f.manager.Cleanup(ctx)
1✔
96

1✔
97
        jobs := []struct {
1✔
98
                name string
1✔
99
                fn   func(ctx context.Context) error
1✔
100
        }{
1✔
101
                {
1✔
102
                        name: "FetchInstances",
1✔
103
                        fn:   f.fetcher.FetchInstances,
1✔
104
                },
1✔
105
                {
1✔
106
                        name: "SnapshotInstance",
1✔
107
                        fn: func(ctx context.Context) error {
2✔
108
                                f.replicator.SnapshotInstance(ctx, f.fetcher.GetChan())
1✔
109
                                return nil
1✔
110
                        },
1✔
111
                },
112
                {
113
                        name: "VerifySnapshot",
114
                        fn: func(ctx context.Context) error {
1✔
115
                                f.verifier.VerifySnapshot(ctx, f.replicator.GetChan())
1✔
116
                                return nil
1✔
117
                        },
1✔
118
                },
119
                {
120
                        name: "ScanSnapshot",
121
                        fn: func(ctx context.Context) error {
1✔
122
                                f.evaluator.ScanSnapshot(ctx, f.verifier.GetChan())
1✔
123
                                return nil
1✔
124
                        },
1✔
125
                },
126
                {
127
                        name: "CreateEvents",
128
                        fn: func(ctx context.Context) error {
1✔
129
                                f.eventsCreator.CreateEvents(ctx, f.evaluator.GetChan())
1✔
130
                                return nil
1✔
131
                        },
1✔
132
                },
133
        }
134
        var wg sync.WaitGroup
1✔
135
        wg.Add(len(jobs))
1✔
136
        for _, job := range jobs {
6✔
137
                go func() {
10✔
138
                        defer wg.Done()
5✔
139
                        err := job.fn(ctx)
5✔
140
                        if err != nil {
5✔
NEW
141
                                f.log.Errorf("VulnerabilityWorker.work job %s failed: %s", job.name, err.Error())
×
142
                        } else {
5✔
143
                                f.log.Infof("VulnerabilityWorker.work job %s finished", job.name)
5✔
144
                        }
5✔
145
                }()
146
        }
147

148
        f.log.Info("VulnerabilityWorker.work waiting on workers")
1✔
149
        wg.Wait()
1✔
150
        f.log.Info("VulnerabilityWorker.work finished waiting on workers")
1✔
151
}
152

153
func (f *VulnerabilityWorker) GetChan() chan []beat.Event {
2✔
154
        return f.eventsCreator.GetChan()
2✔
155
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc