• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5491

17 Jul 2025 01:25AM UTC coverage: 59.326% (-0.2%) from 59.502%
#5491

push

travis-ci

web-flow
Populate DV with events from PVC Prime (#3764)

* update role for controller so it can get list of events

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add new field index for events so we can filter by the object's name. add new function that gets all events associated with a primePvc and re-emits them for the regular pvc

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add watcher for Events filtered by PVC type. modify copyEvent func to only emit unique events from primePVC

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add new field index for event uids so we can filter accordingly

Signed-off-by: dsanatar <dsanatar@redhat.com>

* sort events by most recent timestamps and so we can loop more efficiently to emit new events

Signed-off-by: dsanatar <dsanatar@redhat.com>

* fix linting

Signed-off-by: dsanatar <dsanatar@redhat.com>

* modify watcher to filter on only prime pvc events. move copyEvents to ReconcileTargetPvc func. modify copyEvents logic to handle edge case where events have same timestamps

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add missing bracket

Signed-off-by: dsanatar <dsanatar@redhat.com>

* modify CopyEvents func to take in two client.Objects instead so we can resuse the same func when we need to copy events from primePvc to pvc and from pvc to dv

Signed-off-by: dsanatar <dsanatar@redhat.com>

* move func call to CopyEvents to emitEvents func so it only occurs when DVs status has changed

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add conditional to CopyEvents so when we are handling DVs we only copy over events from the primePVC

Signed-off-by: dsanatar <dsanatar@redhat.com>

* remove debug logs

Signed-off-by: dsanatar <dsanatar@redhat.com>

* reuse existing function to add pvcPrime name annotation to import populator so that we can access the prime name downstream

Signed-off-by: dsanatar <dsanatar@redhat.com>

* update DV bound condition to include PVC Prime name if one exists while the claim is stil... (continued)

75 of 206 new or added lines in 11 files covered. (36.41%)

5 existing lines in 2 files now uncovered.

17163 of 28930 relevant lines covered (59.33%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.56
/pkg/uploadproxy/uploadproxy.go
1
package uploadproxy
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/tls"
7
        "crypto/x509"
8
        "fmt"
9
        "html"
10
        "io"
11
        "log"
12
        "net/http"
13
        "net/http/httputil"
14
        "net/url"
15
        "regexp"
16
        "strconv"
17
        "strings"
18
        "time"
19

20
        "github.com/pkg/errors"
21
        "github.com/rs/cors"
22

23
        v1 "k8s.io/api/core/v1"
24
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
25
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
        "k8s.io/apimachinery/pkg/util/wait"
27
        "k8s.io/client-go/kubernetes"
28
        "k8s.io/klog/v2"
29

30
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
31
        "kubevirt.io/containerized-data-importer/pkg/common"
32
        "kubevirt.io/containerized-data-importer/pkg/controller"
33
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
34
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
35
        "kubevirt.io/containerized-data-importer/pkg/token"
36
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
37
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
38
)
39

40
const (
41
        healthzPath = "/healthz"
42

43
        waitReadyTime     = 10 * time.Second
44
        waitReadyImterval = time.Second
45

46
        proxyRequestTimeout = 24 * time.Hour
47

48
        uploadTokenLeeway = 10 * time.Second
49
)
50

51
// Server is the public interface to the upload proxy
52
type Server interface {
53
        Start() error
54
}
55

56
// CertWatcher is the interface for resources that watch certs
57
type CertWatcher interface {
58
        GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, error)
59
}
60

61
// ClientCreator crates *http.Clients
62
type ClientCreator interface {
63
        CreateClient() (*http.Client, error)
64
}
65

66
type urlLookupFunc func(string, string, string) string
67
type uploadPossibleFunc func(*v1.PersistentVolumeClaim) error
68

69
type uploadProxyApp struct {
70
        bindAddress string
71
        bindPort    uint
72

73
        client kubernetes.Interface
74

75
        cdiConfigTLSWatcher cryptowatch.CdiConfigTLSWatcher
76

77
        certWatcher CertWatcher
78

79
        clientCreator ClientCreator
80

81
        tokenValidator token.Validator
82

83
        handler http.Handler
84

85
        // test hooks
86
        urlResolver    urlLookupFunc
87
        uploadPossible uploadPossibleFunc
88
}
89

90
type clientCreator struct {
91
        certFetcher   fetcher.CertFetcher
92
        bundleFetcher fetcher.CertBundleFetcher
93
}
94

95
var authHeaderMatcher = regexp.MustCompile(`(?i)^Bearer\s+([A-Za-z0-9\-\._~\+\/]+)$`)
96

97
// NewUploadProxy returns an initialized uploadProxyApp
98
func NewUploadProxy(bindAddress string,
99
        bindPort uint,
100
        apiServerPublicKey string,
101
        cdiConfigTLSWatcher cryptowatch.CdiConfigTLSWatcher,
102
        certWatcher CertWatcher,
103
        clientCertFetcher fetcher.CertFetcher,
104
        serverCAFetcher fetcher.CertBundleFetcher,
105
        client kubernetes.Interface) (Server, error) {
×
106
        var err error
×
107
        app := &uploadProxyApp{
×
108
                bindAddress:         bindAddress,
×
109
                bindPort:            bindPort,
×
110
                cdiConfigTLSWatcher: cdiConfigTLSWatcher,
×
111
                certWatcher:         certWatcher,
×
112
                clientCreator:       &clientCreator{certFetcher: clientCertFetcher, bundleFetcher: serverCAFetcher},
×
113
                client:              client,
×
114
                urlResolver:         controller.GetUploadServerURL,
×
115
                uploadPossible:      controller.UploadPossibleForPVC,
×
116
        }
×
117
        // retrieve RSA key used by apiserver to sign tokens
×
118
        err = app.getSigningKey(apiServerPublicKey)
×
119
        if err != nil {
×
120
                return nil, errors.Errorf("unable to retrieve apiserver signing key: %v", errors.WithStack(err))
×
121
        }
×
122

123
        app.initHandler()
×
124

×
125
        return app, nil
×
126
}
127

128
func (c *clientCreator) CreateClient() (*http.Client, error) {
1✔
129
        clientCertBytes, err := c.certFetcher.CertBytes()
1✔
130
        if err != nil {
1✔
131
                return nil, err
×
132
        }
×
133

134
        clientKeyBytes, err := c.certFetcher.KeyBytes()
1✔
135
        if err != nil {
1✔
136
                return nil, err
×
137
        }
×
138

139
        serverBundleBytes, err := c.bundleFetcher.BundleBytes()
1✔
140
        if err != nil {
1✔
141
                return nil, err
×
142
        }
×
143

144
        clientCert, err := tls.X509KeyPair(clientCertBytes, clientKeyBytes)
1✔
145
        if err != nil {
1✔
146
                return nil, err
×
147
        }
×
148

149
        caCertPool := x509.NewCertPool()
1✔
150
        if !caCertPool.AppendCertsFromPEM(serverBundleBytes) {
1✔
151
                klog.Error("Error parsing uploadserver CA bundle")
×
152
        }
×
153

154
        tlsConfig := &tls.Config{
1✔
155
                Certificates: []tls.Certificate{clientCert},
1✔
156
                RootCAs:      caCertPool,
1✔
157
                MinVersion:   tls.VersionTLS12,
1✔
158
        }
1✔
159

1✔
160
        transport := &http.Transport{TLSClientConfig: tlsConfig}
1✔
161
        return &http.Client{Transport: transport, Timeout: proxyRequestTimeout}, nil
1✔
162
}
163

164
func (app *uploadProxyApp) initHandler() {
1✔
165
        mux := http.NewServeMux()
1✔
166
        mux.HandleFunc(healthzPath, app.handleHealthzRequest)
1✔
167
        for _, path := range common.ProxyPaths {
2✔
168
                mux.HandleFunc(path, app.handleUploadRequest)
1✔
169
        }
1✔
170
        app.handler = cors.AllowAll().Handler(mux)
1✔
171
}
172

173
func (app *uploadProxyApp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1✔
174
        app.handler.ServeHTTP(w, r)
1✔
175
}
1✔
176

177
func (app *uploadProxyApp) handleHealthzRequest(w http.ResponseWriter, r *http.Request) {
1✔
178
        _, err := io.WriteString(w, "OK")
1✔
179
        if err != nil {
1✔
180
                klog.Errorf("handleHealthzRequest: failed to send response; %v", err)
×
181
        }
×
182
}
183

184
func (app *uploadProxyApp) handleUploadRequest(w http.ResponseWriter, r *http.Request) {
1✔
185
        tokenHeader := r.Header.Get("Authorization")
1✔
186
        if tokenHeader == "" {
2✔
187
                w.WriteHeader(http.StatusBadRequest)
1✔
188
                return
1✔
189
        }
1✔
190

191
        match := authHeaderMatcher.FindStringSubmatch(tokenHeader)
1✔
192
        if len(match) != 2 {
2✔
193
                w.WriteHeader(http.StatusBadRequest)
1✔
194
                return
1✔
195
        }
1✔
196

197
        tokenData, err := app.tokenValidator.Validate(match[1])
1✔
198
        if err != nil {
2✔
199
                w.WriteHeader(http.StatusUnauthorized)
1✔
200
                return
1✔
201
        }
1✔
202

203
        if tokenData.Operation != token.OperationUpload ||
1✔
204
                tokenData.Name == "" ||
1✔
205
                tokenData.Namespace == "" ||
1✔
206
                tokenData.Resource.Resource != "persistentvolumeclaims" {
1✔
207
                klog.Errorf("Bad token %+v", tokenData)
×
208
                w.WriteHeader(http.StatusBadRequest)
×
209
                return
×
210
        }
×
211

212
        klog.V(1).Infof("Received valid token: pvc: %s, namespace: %s", tokenData.Name, tokenData.Namespace)
1✔
213

1✔
214
        pvc, err := app.uploadReady(tokenData.Name, tokenData.Namespace)
1✔
215
        if err != nil {
1✔
216
                klog.Error(err)
×
217
                w.WriteHeader(http.StatusServiceUnavailable)
×
218
                // Return the error to the caller in the body.
×
219
                _, err = fmt.Fprint(w, html.EscapeString(err.Error()))
×
220
                if err != nil {
×
221
                        klog.Errorf("handleUploadRequest: failed to send error response: %v", err)
×
222
                }
×
223
                return
×
224
        }
225

226
        uploadPath, err := app.resolveUploadPath(pvc, tokenData.Name, r.URL.Path)
1✔
227
        if err != nil {
1✔
228
                klog.Error(err)
×
229
                w.WriteHeader(http.StatusServiceUnavailable)
×
230
                // Return the error to the caller in the body.
×
231
                _, err = fmt.Fprint(w, html.EscapeString(err.Error()))
×
232
                if err != nil {
×
233
                        klog.Errorf("handleUploadRequest: failed to send error response: %v", err)
×
234
                }
×
235
                return
×
236
        }
237

238
        app.proxyUploadRequest(uploadPath, w, r)
1✔
239
}
240

241
func (app *uploadProxyApp) resolveUploadPath(pvc *v1.PersistentVolumeClaim, pvcName, defaultPath string) (string, error) {
1✔
242
        var path string
1✔
243
        contentType := pvc.Annotations[cc.AnnContentType]
1✔
244
        switch contentType {
1✔
245
        case string(cdiv1.DataVolumeKubeVirt), "":
1✔
246
                path = defaultPath
1✔
247
        case string(cdiv1.DataVolumeArchive):
×
248
                if strings.Contains(defaultPath, "alpha") {
×
249
                        path = common.UploadArchiveAlphaPath
×
250
                } else {
×
251
                        path = common.UploadArchivePath
×
252
                }
×
253
        default:
×
254
                // Caller is escaping user-controlled strings to avoid cross-site scripting (XSS) attacks
×
255
                return "", fmt.Errorf("rejecting upload request for PVC %s - upload content-type %s is invalid", pvcName, contentType)
×
256
        }
257

258
        return app.urlResolver(pvc.Namespace, pvc.Name, path), nil
1✔
259
}
260

261
func (app *uploadProxyApp) uploadReady(pvcName, pvcNamespace string) (*v1.PersistentVolumeClaim, error) {
1✔
262
        var pvc *v1.PersistentVolumeClaim
1✔
263
        err := wait.PollUntilContextTimeout(context.TODO(), waitReadyImterval, waitReadyTime, true, func(ctx context.Context) (bool, error) {
2✔
264
                var err error
1✔
265
                pvc, err = app.client.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcName, metav1.GetOptions{})
1✔
266
                if err != nil {
1✔
267
                        if k8serrors.IsNotFound(err) {
×
268
                                return false, fmt.Errorf("rejecting Upload Request for PVC %s that doesn't exist", pvcName)
×
269
                        }
×
270

271
                        return false, err
×
272
                }
273
                // If using upload populator then need to check upload possibility to the PVC'
274
                if populators.IsPVCDataSourceRefKind(pvc, cdiv1.VolumeUploadSourceRef) {
1✔
275
                        pvc, err = app.getPopulationPVC(ctx, pvc, pvcNamespace)
×
276
                        if pvc == nil || err != nil {
×
277
                                return false, err
×
278
                        }
×
279
                }
280

281
                err = app.uploadPossible(pvc)
1✔
282
                if err != nil {
1✔
283
                        return false, err
×
284
                }
×
285
                phase := v1.PodPhase(pvc.Annotations[cc.AnnPodPhase])
1✔
286
                if phase == v1.PodSucceeded {
1✔
287
                        return false, fmt.Errorf("rejecting Upload Request for PVC %s that already finished uploading", pvcName)
×
288
                }
×
289

290
                ready, _ := strconv.ParseBool(pvc.Annotations[cc.AnnPodReady])
1✔
291
                return ready, nil
1✔
292
        })
293

294
        return pvc, err
1✔
295
}
296

297
func (app *uploadProxyApp) proxyUploadRequest(uploadPath string, w http.ResponseWriter, r *http.Request) {
1✔
298
        client, err := app.clientCreator.CreateClient()
1✔
299
        if err != nil {
1✔
300
                klog.Error("Error creating http client")
×
301
                w.WriteHeader(http.StatusInternalServerError)
×
302
                return
×
303
        }
×
304

305
        var buff bytes.Buffer
1✔
306
        p := &httputil.ReverseProxy{
1✔
307
                Director: func(req *http.Request) {
2✔
308
                        req.URL, _ = url.Parse(uploadPath)
1✔
309
                        if _, ok := req.Header["User-Agent"]; !ok {
2✔
310
                                // explicitly disable User-Agent so it's not set to default value
1✔
311
                                req.Header.Set("User-Agent", "")
1✔
312
                        }
1✔
313
                },
314
                Transport: client.Transport,
315
                ErrorLog:  log.New(&buff, "", 0),
316
        }
317

318
        p.ServeHTTP(w, r)
1✔
319

1✔
320
        if buff.Len() > 0 {
2✔
321
                msg := buff.String()
1✔
322
                klog.Errorf("Error in reverse proxy: %s", msg)
1✔
323
                fmt.Fprintf(w, "error in upload-proxy: %s", msg)
1✔
324
        }
1✔
325
}
326

327
func (app *uploadProxyApp) getSigningKey(publicKeyPEM string) error {
1✔
328
        publicKey, err := controller.DecodePublicKey([]byte(publicKeyPEM))
1✔
329
        if err != nil {
1✔
330
                return err
×
331
        }
×
332

333
        app.tokenValidator = token.NewValidator(common.UploadTokenIssuer, publicKey, uploadTokenLeeway)
1✔
334
        return nil
1✔
335
}
336

337
func (app *uploadProxyApp) Start() error {
×
338
        return app.startTLS()
×
339
}
×
340

341
func (app *uploadProxyApp) getTLSConfig() *tls.Config {
×
342
        cryptoConfig := app.cdiConfigTLSWatcher.GetCdiTLSConfig()
×
343

×
344
        //nolint:gosec // False positive (MinVersion unknown at build time)
×
345
        tlsConfig := &tls.Config{
×
346
                GetCertificate: app.certWatcher.GetCertificate,
×
347
                CipherSuites:   cryptoConfig.CipherSuites,
×
348
                MinVersion:     cryptoConfig.MinVersion,
×
349
        }
×
350

×
351
        return tlsConfig
×
352
}
×
353

354
func (app *uploadProxyApp) startTLS() error {
×
355
        var serveFunc func() error
×
356
        bindAddr := fmt.Sprintf("%s:%d", app.bindAddress, app.bindPort)
×
357

×
358
        server := &http.Server{
×
359
                Addr:              bindAddr,
×
360
                Handler:           app,
×
361
                ReadHeaderTimeout: 10 * time.Second,
×
362
        }
×
363

×
364
        if app.certWatcher != nil {
×
365
                tlsConfig := app.getTLSConfig()
×
366
                tlsConfig.GetConfigForClient = func(_ *tls.ClientHelloInfo) (*tls.Config, error) {
×
367
                        klog.V(3).Info("Getting TLS config")
×
368
                        config := app.getTLSConfig()
×
369
                        return config, nil
×
370
                }
×
371
                server.TLSConfig = tlsConfig
×
372

×
373
                serveFunc = func() error {
×
374
                        return server.ListenAndServeTLS("", "")
×
375
                }
×
376
        } else {
×
377
                serveFunc = func() error {
×
378
                        return server.ListenAndServe()
×
379
                }
×
380
        }
381

382
        errChan := make(chan error)
×
383

×
384
        go func() {
×
385
                errChan <- serveFunc()
×
386
        }()
×
387

388
        // wait for server to exit
389
        return <-errChan
×
390
}
391

392
func (app *uploadProxyApp) getPopulationPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim, pvcNamespace string) (*v1.PersistentVolumeClaim, error) {
×
NEW
393
        pvcPrimeName, ok := pvc.Annotations[cc.AnnPVCPrimeName]
×
394
        if !ok {
×
395
                // wait for pvcPrimeName annotation on the pvc
×
396
                return nil, nil
×
397
        }
×
398
        pvcPrime, err := app.client.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcPrimeName, metav1.GetOptions{})
×
399
        if err != nil {
×
400
                if k8serrors.IsNotFound(err) {
×
401
                        return nil, fmt.Errorf("rejecting Upload Request for PVC %s, PVC' wasn't created yet", pvc.Name)
×
402
                }
×
403

404
                return nil, err
×
405
        }
406

407
        return pvcPrime, nil
×
408
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc