• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4941

03 Sep 2024 08:10PM UTC coverage: 59.189% (+0.02%) from 59.167%
#4941

push

travis-ci

web-flow
Handle storage class changes automatically in dic controller (#3353)

Until now we've never performed the necessary adjustments when the
desired storage class changed. Instead, we documented it and relied on
the user to perform the cleanup of actual sources.
This commit start doing that on behalf of the user.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

21 of 37 new or added lines in 1 file covered. (56.76%)

12 existing lines in 2 files now uncovered.

16645 of 28122 relevant lines covered (59.19%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.39
/cmd/openstack-populator/openstack-populator.go
1
package main
2

3
import (
4
        "context"
5
        "crypto/tls"
6
        "crypto/x509"
7
        "errors"
8
        "flag"
9
        "io"
10
        "net"
11
        "net/http"
12
        "net/url"
13
        "os"
14
        "strconv"
15
        "strings"
16
        "time"
17

18
        "github.com/gophercloud/gophercloud/v2"
19
        "github.com/gophercloud/gophercloud/v2/openstack"
20
        "github.com/gophercloud/gophercloud/v2/openstack/image/v2/imagedata"
21
        "github.com/gophercloud/utils/v2/openstack/clientconfig"
22

23
        "k8s.io/klog/v2"
24

25
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/openstack-populator"
26
        prometheusutil "kubevirt.io/containerized-data-importer/pkg/util/prometheus"
27
)
28

29
const (
30
        regionName                  = "regionName"
31
        authTypeString              = "authType"
32
        username                    = "username"
33
        userID                      = "userID"
34
        password                    = "password"
35
        applicationCredentialID     = "applicationCredentialID"
36
        applicationCredentialName   = "applicationCredentialName"
37
        applicationCredentialSecret = "applicationCredentialSecret"
38
        token                       = "token"
39
        systemScope                 = "systemScope"
40
        projectName                 = "projectName"
41
        projectID                   = "projectID"
42
        userDomainName              = "userDomainName"
43
        userDomainID                = "userDomainID"
44
        projectDomainName           = "projectDomainName"
45
        projectDomainID             = "projectDomainID"
46
        domainName                  = "domainName"
47
        domainID                    = "domainID"
48
        defaultDomain               = "defaultDomain"
49
        insecureSkipVerify          = "insecureSkipVerify"
50
        caCert                      = "cacert"
51
        endpointAvailability        = "availability"
52
)
53

54
const (
55
        unsupportedAuthTypeErrStr = "unsupported authentication type"
56
        malformedCAErrStr         = "CA certificate is malformed, failed to configure the CA cert pool"
57
)
58

59
var supportedAuthTypes = map[string]clientconfig.AuthType{
60
        "password":              clientconfig.AuthPassword,
61
        "token":                 clientconfig.AuthToken,
62
        "applicationcredential": clientconfig.AuthV3ApplicationCredential,
63
}
64

65
type appConfig struct {
66
        identityEndpoint string
67
        imageID          string
68
        secretName       string
69
        ownerUID         string
70
        pvcSize          int64
71
        volumePath       string
72
}
73

74
type countingReader struct {
75
        reader io.ReadCloser
76
        total  int64
77
        read   *int64
78
}
79

80
func (cr *countingReader) Read(p []byte) (int, error) {
1✔
81
        n, err := cr.reader.Read(p)
1✔
82
        cr.total += int64(n)
1✔
83
        return n, err
1✔
84
}
1✔
85

86
func main() {
×
87
        klog.InitFlags(nil)
×
88

×
89
        config := &appConfig{}
×
90
        flag.StringVar(&config.identityEndpoint, "endpoint", "", "endpoint URL (https://openstack.example.com:5000/v2.0)")
×
91
        flag.StringVar(&config.secretName, "secret-name", "", "secret containing OpenStack credentials")
×
92
        flag.StringVar(&config.imageID, "image-id", "", "Openstack image ID")
×
93
        flag.StringVar(&config.volumePath, "volume-path", "", "Path to populate")
×
94
        flag.StringVar(&config.ownerUID, "owner-uid", "", "Owner UID (usually PVC UID)")
×
95
        flag.Int64Var(&config.pvcSize, "pvc-size", 0, "Size of pvc (in bytes)")
×
96
        flag.Parse()
×
97

×
98
        certsDirectory, err := os.MkdirTemp("", "certsdir")
×
99
        if err != nil {
×
100
                panic(err)
×
101
        }
102

103
        defer os.RemoveAll(certsDirectory)
×
104

×
105
        prometheusutil.StartPrometheusEndpoint("/certsdir")
×
106

×
107
        populate(config)
×
108
}
109

110
func populate(config *appConfig) {
1✔
111
        provider, err := getProviderClient(config.identityEndpoint)
1✔
112
        if err != nil {
1✔
113
                klog.Fatal(err)
×
114
        }
×
115

116
        imageReader, err := setupImageService(provider, config)
1✔
117
        if err != nil {
1✔
118
                klog.Fatal(err)
×
119
        }
×
120
        defer imageReader.Close()
1✔
121

1✔
122
        downloadAndSaveImage(config, imageReader)
1✔
123
}
124

125
func downloadAndSaveImage(config *appConfig, imageReader io.ReadCloser) {
1✔
126
        klog.Info("Downloading the image: ", config.imageID)
1✔
127
        file := openFile(config.volumePath)
1✔
128
        defer file.Close()
1✔
129

1✔
130
        createProgressCounter()
1✔
131
        writeData(imageReader, file, config)
1✔
132
}
1✔
133

134
func setupImageService(provider *gophercloud.ProviderClient, config *appConfig) (io.ReadCloser, error) {
1✔
135
        imageService, err := openstack.NewImageV2(provider, getEndpointOpts())
1✔
136
        if err != nil {
1✔
137
                return nil, err
×
138
        }
×
139

140
        imageReader, err := imagedata.Download(context.Background(), imageService, config.imageID).Extract()
1✔
141
        if err != nil {
1✔
142
                return nil, err
×
143
        }
×
144

145
        return imageReader, nil
1✔
146
}
147

148
func getEndpointOpts() gophercloud.EndpointOpts {
1✔
149
        availability := gophercloud.AvailabilityPublic
1✔
150
        if a := getStringFromSecret(endpointAvailability); a != "" {
1✔
151
                availability = gophercloud.Availability(a)
×
152
        }
×
153

154
        return gophercloud.EndpointOpts{
1✔
155
                Region:       getStringFromSecret(regionName),
1✔
156
                Availability: availability,
1✔
157
        }
1✔
158
}
159

160
func writeData(reader io.ReadCloser, file *os.File, config *appConfig) {
1✔
161
        countingReader := &countingReader{reader: reader, total: config.pvcSize, read: new(int64)}
1✔
162
        done := make(chan bool)
1✔
163

1✔
164
        go reportProgress(done, countingReader, config)
1✔
165

1✔
166
        if _, err := io.Copy(file, countingReader); err != nil {
1✔
167
                klog.Fatal(err)
×
168
        }
×
169
        done <- true
1✔
170
}
171

172
func reportProgress(done chan bool, countingReader *countingReader, config *appConfig) {
1✔
173
        for {
2✔
174
                select {
1✔
175
                case <-done:
1✔
176
                        finalizeProgress(config.ownerUID)
1✔
177
                        return
1✔
UNCOV
178
                default:
×
UNCOV
179
                        updateProgress(countingReader, config.ownerUID)
×
UNCOV
180
                        time.Sleep(1 * time.Second)
×
181
                }
182
        }
183
}
184

185
func createProgressCounter() {
1✔
186
        if err := metrics.SetupMetrics(); err != nil {
1✔
187
                klog.Error("Prometheus progress counter not registered:", err)
×
188
        } else {
1✔
189
                klog.Info("Prometheus progress counter registered.")
1✔
190
        }
1✔
191
}
192

193
func finalizeProgress(ownerUID string) {
1✔
194
        progress, err := metrics.GetPopulatorProgress(ownerUID)
1✔
195
        if err != nil {
1✔
196
                klog.Error("Error reading current progress:", err)
×
197
                return
×
198
        }
×
199

200
        remainingProgress := 100 - progress
1✔
201
        if remainingProgress > 0 {
2✔
202
                metrics.AddPopulatorProgress(ownerUID, remainingProgress)
1✔
203
        }
1✔
204

205
        klog.Info("Finished populating the volume. Progress: 100%")
1✔
206
}
207

UNCOV
208
func updateProgress(countingReader *countingReader, ownerUID string) {
×
UNCOV
209
        if countingReader.total <= 0 {
×
210
                return
×
211
        }
×
212

UNCOV
213
        progress, err := metrics.GetPopulatorProgress(ownerUID)
×
UNCOV
214
        if err != nil {
×
215
                klog.Errorf("updateProgress: failed to get metric; %v", err)
×
216
        }
×
217

UNCOV
218
        currentProgress := (float64(*countingReader.read) / float64(countingReader.total)) * 100
×
UNCOV
219

×
UNCOV
220
        if currentProgress > progress {
×
221
                metrics.AddPopulatorProgress(ownerUID, currentProgress-progress)
×
222
        }
×
223

UNCOV
224
        klog.Info("Progress: ", int64(currentProgress), "%")
×
225
}
226

227
func openFile(volumePath string) *os.File {
1✔
228
        flags := os.O_RDWR
1✔
229
        if strings.HasSuffix(volumePath, "disk.img") {
2✔
230
                flags |= os.O_CREATE
1✔
231
        }
1✔
232
        file, err := os.OpenFile(volumePath, flags, 0650)
1✔
233
        if err != nil {
1✔
234
                klog.Fatal(err)
×
235
        }
×
236
        return file
1✔
237
}
238

239
func getAuthType() (clientconfig.AuthType, error) {
1✔
240
        configuredAuthType := getStringFromSecret(authTypeString)
1✔
241
        if configuredAuthType == "" {
2✔
242
                return clientconfig.AuthPassword, nil
1✔
243
        }
1✔
244

245
        if supportedAuthType, found := supportedAuthTypes[configuredAuthType]; found {
×
246
                return supportedAuthType, nil
×
247
        }
×
248

249
        err := errors.New(unsupportedAuthTypeErrStr)
×
250
        klog.Fatal(err.Error(), "authType", configuredAuthType)
×
251
        return clientconfig.AuthType(""), err
×
252
}
253

254
func getStringFromSecret(key string) string {
1✔
255
        value := os.Getenv(key)
1✔
256
        return value
1✔
257
}
1✔
258

259
func getBoolFromSecret(key string) bool {
×
260
        if keyStr := getStringFromSecret(key); keyStr != "" {
×
261
                value, err := strconv.ParseBool(keyStr)
×
262
                if err != nil {
×
263
                        return false
×
264
                }
×
265
                return value
×
266
        }
267
        return false
×
268
}
269

270
func getProviderClient(identityEndpoint string) (*gophercloud.ProviderClient, error) {
1✔
271
        authInfo := &clientconfig.AuthInfo{
1✔
272
                AuthURL:           identityEndpoint,
1✔
273
                ProjectName:       getStringFromSecret(projectName),
1✔
274
                ProjectID:         getStringFromSecret(projectID),
1✔
275
                UserDomainName:    getStringFromSecret(userDomainName),
1✔
276
                UserDomainID:      getStringFromSecret(userDomainID),
1✔
277
                ProjectDomainName: getStringFromSecret(projectDomainName),
1✔
278
                ProjectDomainID:   getStringFromSecret(projectDomainID),
1✔
279
                DomainName:        getStringFromSecret(domainName),
1✔
280
                DomainID:          getStringFromSecret(domainID),
1✔
281
                DefaultDomain:     getStringFromSecret(defaultDomain),
1✔
282
                AllowReauth:       true,
1✔
283
        }
1✔
284

1✔
285
        var authType clientconfig.AuthType
1✔
286
        authType, err := getAuthType()
1✔
287
        if err != nil {
1✔
288
                klog.Fatal(err.Error())
×
289
                return nil, err
×
290
        }
×
291

292
        switch authType {
1✔
293
        case clientconfig.AuthPassword:
1✔
294
                authInfo.Username = getStringFromSecret(username)
1✔
295
                authInfo.UserID = getStringFromSecret(userID)
1✔
296
                authInfo.Password = getStringFromSecret(password)
1✔
297
        case clientconfig.AuthToken:
×
298
                authInfo.Token = getStringFromSecret(token)
×
299
        case clientconfig.AuthV3ApplicationCredential:
×
300
                authInfo.Username = getStringFromSecret(username)
×
301
                authInfo.ApplicationCredentialID = getStringFromSecret(applicationCredentialID)
×
302
                authInfo.ApplicationCredentialName = getStringFromSecret(applicationCredentialName)
×
303
                authInfo.ApplicationCredentialSecret = getStringFromSecret(applicationCredentialSecret)
×
304
        }
305

306
        identityURL, err := url.Parse(identityEndpoint)
1✔
307
        if err != nil {
1✔
308
                klog.Fatal(err.Error())
×
309
                return nil, err
×
310
        }
×
311

312
        var TLSClientConfig *tls.Config
1✔
313
        if identityURL.Scheme == "https" {
1✔
314
                if getBoolFromSecret(insecureSkipVerify) {
×
315
                        TLSClientConfig = &tls.Config{InsecureSkipVerify: true} //nolint:gosec
×
316
                } else {
×
317
                        cacert := []byte(getStringFromSecret(caCert))
×
318
                        if len(cacert) == 0 {
×
319
                                klog.Info("CA certificate was not provided,system CA cert pool is used")
×
320
                        } else {
×
321
                                roots := x509.NewCertPool()
×
322
                                ok := roots.AppendCertsFromPEM(cacert)
×
323
                                if !ok {
×
324
                                        err = errors.New(malformedCAErrStr)
×
325
                                        klog.Fatal(err.Error())
×
326
                                        return nil, err
×
327
                                }
×
328
                                TLSClientConfig = &tls.Config{
×
329
                                        RootCAs:    roots,
×
330
                                        MinVersion: tls.VersionTLS12,
×
331
                                }
×
332
                        }
333
                }
334
        }
335

336
        provider, err := openstack.NewClient(identityEndpoint)
1✔
337
        if err != nil {
1✔
338
                klog.Fatal(err.Error())
×
339
                return nil, err
×
340
        }
×
341

342
        provider.HTTPClient.Transport = &http.Transport{
1✔
343
                Proxy: http.ProxyFromEnvironment,
1✔
344
                DialContext: (&net.Dialer{
1✔
345
                        Timeout:   10 * time.Second,
1✔
346
                        KeepAlive: 10 * time.Second,
1✔
347
                }).DialContext,
1✔
348
                MaxIdleConns:          10,
1✔
349
                IdleConnTimeout:       10 * time.Second,
1✔
350
                TLSHandshakeTimeout:   10 * time.Second,
1✔
351
                ExpectContinueTimeout: 1 * time.Second,
1✔
352
                TLSClientConfig:       TLSClientConfig,
1✔
353
        }
1✔
354

1✔
355
        clientOpts := &clientconfig.ClientOpts{
1✔
356
                AuthType: authType,
1✔
357
                AuthInfo: authInfo,
1✔
358
        }
1✔
359

1✔
360
        opts, err := clientconfig.AuthOptions(clientOpts)
1✔
361
        if err != nil {
1✔
362
                klog.Fatal(err.Error())
×
363
                return nil, err
×
364
        }
×
365

366
        err = openstack.Authenticate(context.Background(), provider, *opts)
1✔
367
        if err != nil {
1✔
368
                klog.Fatal(err.Error())
×
369
                return nil, err
×
370
        }
×
371
        return provider, nil
1✔
372
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc