• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4738

13 Jun 2024 01:42PM UTC coverage: 59.056% (+0.03%) from 59.023%
#4738

push

travis-ci

web-flow
fix: Make valid labels regex less restrictive (#3317)

This makes the valid labels regex that is used to restrict labels that
are transferred from a prime PVC to an actual PVC less restrictive, so
labels like 'instancetype.kubevirt.io/default-preference' are transferred too.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

16287 of 27579 relevant lines covered (59.06%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.8
/pkg/uploadproxy/uploadproxy.go
1
package uploadproxy
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/tls"
7
        "crypto/x509"
8
        "fmt"
9
        "html"
10
        "io"
11
        "log"
12
        "net/http"
13
        "net/http/httputil"
14
        "net/url"
15
        "regexp"
16
        "strconv"
17
        "strings"
18
        "time"
19

20
        "github.com/pkg/errors"
21
        "github.com/rs/cors"
22

23
        v1 "k8s.io/api/core/v1"
24
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
25
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
        "k8s.io/apimachinery/pkg/util/wait"
27
        "k8s.io/client-go/kubernetes"
28
        "k8s.io/klog/v2"
29

30
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
31
        "kubevirt.io/containerized-data-importer/pkg/common"
32
        "kubevirt.io/containerized-data-importer/pkg/controller"
33
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
34
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
35
        "kubevirt.io/containerized-data-importer/pkg/token"
36
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
37
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
38
)
39

40
const (
41
        healthzPath = "/healthz"
42

43
        waitReadyTime     = 10 * time.Second
44
        waitReadyImterval = time.Second
45

46
        proxyRequestTimeout = 24 * time.Hour
47

48
        uploadTokenLeeway = 10 * time.Second
49
)
50

51
// Server is the public interface to the upload proxy
52
type Server interface {
53
        Start() error
54
}
55

56
// CertWatcher is the interface for resources that watch certs
57
type CertWatcher interface {
58
        GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, error)
59
}
60

61
// ClientCreator crates *http.Clients
62
type ClientCreator interface {
63
        CreateClient() (*http.Client, error)
64
}
65

66
type urlLookupFunc func(string, string, string) string
67
type uploadPossibleFunc func(*v1.PersistentVolumeClaim) error
68

69
type uploadProxyApp struct {
70
        bindAddress string
71
        bindPort    uint
72

73
        client kubernetes.Interface
74

75
        cdiConfigTLSWatcher cryptowatch.CdiConfigTLSWatcher
76

77
        certWatcher CertWatcher
78

79
        clientCreator ClientCreator
80

81
        tokenValidator token.Validator
82

83
        handler http.Handler
84

85
        // test hooks
86
        urlResolver    urlLookupFunc
87
        uploadPossible uploadPossibleFunc
88
}
89

90
type clientCreator struct {
91
        certFetcher   fetcher.CertFetcher
92
        bundleFetcher fetcher.CertBundleFetcher
93
}
94

95
var authHeaderMatcher = regexp.MustCompile(`(?i)^Bearer\s+([A-Za-z0-9\-\._~\+\/]+)$`)
96

97
// NewUploadProxy returns an initialized uploadProxyApp
98
func NewUploadProxy(bindAddress string,
99
        bindPort uint,
100
        apiServerPublicKey string,
101
        cdiConfigTLSWatcher cryptowatch.CdiConfigTLSWatcher,
102
        certWatcher CertWatcher,
103
        clientCertFetcher fetcher.CertFetcher,
104
        serverCAFetcher fetcher.CertBundleFetcher,
105
        client kubernetes.Interface) (Server, error) {
×
106
        var err error
×
107
        app := &uploadProxyApp{
×
108
                bindAddress:         bindAddress,
×
109
                bindPort:            bindPort,
×
110
                cdiConfigTLSWatcher: cdiConfigTLSWatcher,
×
111
                certWatcher:         certWatcher,
×
112
                clientCreator:       &clientCreator{certFetcher: clientCertFetcher, bundleFetcher: serverCAFetcher},
×
113
                client:              client,
×
114
                urlResolver:         controller.GetUploadServerURL,
×
115
                uploadPossible:      controller.UploadPossibleForPVC,
×
116
        }
×
117
        // retrieve RSA key used by apiserver to sign tokens
×
118
        err = app.getSigningKey(apiServerPublicKey)
×
119
        if err != nil {
×
120
                return nil, errors.Errorf("unable to retrieve apiserver signing key: %v", errors.WithStack(err))
×
121
        }
×
122

123
        app.initHandler()
×
124

×
125
        return app, nil
×
126
}
127

128
func (c *clientCreator) CreateClient() (*http.Client, error) {
1✔
129
        clientCertBytes, err := c.certFetcher.CertBytes()
1✔
130
        if err != nil {
1✔
131
                return nil, err
×
132
        }
×
133

134
        clientKeyBytes, err := c.certFetcher.KeyBytes()
1✔
135
        if err != nil {
1✔
136
                return nil, err
×
137
        }
×
138

139
        serverBundleBytes, err := c.bundleFetcher.BundleBytes()
1✔
140
        if err != nil {
1✔
141
                return nil, err
×
142
        }
×
143

144
        clientCert, err := tls.X509KeyPair(clientCertBytes, clientKeyBytes)
1✔
145
        if err != nil {
1✔
146
                return nil, err
×
147
        }
×
148

149
        caCertPool := x509.NewCertPool()
1✔
150
        if !caCertPool.AppendCertsFromPEM(serverBundleBytes) {
1✔
151
                klog.Error("Error parsing uploadserver CA bundle")
×
152
        }
×
153

154
        tlsConfig := &tls.Config{
1✔
155
                Certificates: []tls.Certificate{clientCert},
1✔
156
                RootCAs:      caCertPool,
1✔
157
                MinVersion:   tls.VersionTLS12,
1✔
158
        }
1✔
159
        tlsConfig.BuildNameToCertificate() //nolint:staticcheck // todo: BuildNameToCertificate() is deprecated - check this
1✔
160

1✔
161
        transport := &http.Transport{TLSClientConfig: tlsConfig}
1✔
162
        return &http.Client{Transport: transport, Timeout: proxyRequestTimeout}, nil
1✔
163
}
164

165
func (app *uploadProxyApp) initHandler() {
1✔
166
        mux := http.NewServeMux()
1✔
167
        mux.HandleFunc(healthzPath, app.handleHealthzRequest)
1✔
168
        for _, path := range common.ProxyPaths {
2✔
169
                mux.HandleFunc(path, app.handleUploadRequest)
1✔
170
        }
1✔
171
        app.handler = cors.AllowAll().Handler(mux)
1✔
172
}
173

174
func (app *uploadProxyApp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1✔
175
        app.handler.ServeHTTP(w, r)
1✔
176
}
1✔
177

178
func (app *uploadProxyApp) handleHealthzRequest(w http.ResponseWriter, r *http.Request) {
1✔
179
        _, err := io.WriteString(w, "OK")
1✔
180
        if err != nil {
1✔
181
                klog.Errorf("handleHealthzRequest: failed to send response; %v", err)
×
182
        }
×
183
}
184

185
func (app *uploadProxyApp) handleUploadRequest(w http.ResponseWriter, r *http.Request) {
1✔
186
        tokenHeader := r.Header.Get("Authorization")
1✔
187
        if tokenHeader == "" {
2✔
188
                w.WriteHeader(http.StatusBadRequest)
1✔
189
                return
1✔
190
        }
1✔
191

192
        match := authHeaderMatcher.FindStringSubmatch(tokenHeader)
1✔
193
        if len(match) != 2 {
2✔
194
                w.WriteHeader(http.StatusBadRequest)
1✔
195
                return
1✔
196
        }
1✔
197

198
        tokenData, err := app.tokenValidator.Validate(match[1])
1✔
199
        if err != nil {
2✔
200
                w.WriteHeader(http.StatusUnauthorized)
1✔
201
                return
1✔
202
        }
1✔
203

204
        if tokenData.Operation != token.OperationUpload ||
1✔
205
                tokenData.Name == "" ||
1✔
206
                tokenData.Namespace == "" ||
1✔
207
                tokenData.Resource.Resource != "persistentvolumeclaims" {
1✔
208
                klog.Errorf("Bad token %+v", tokenData)
×
209
                w.WriteHeader(http.StatusBadRequest)
×
210
                return
×
211
        }
×
212

213
        klog.V(1).Infof("Received valid token: pvc: %s, namespace: %s", tokenData.Name, tokenData.Namespace)
1✔
214

1✔
215
        pvc, err := app.uploadReady(tokenData.Name, tokenData.Namespace)
1✔
216
        if err != nil {
1✔
217
                klog.Error(err)
×
218
                w.WriteHeader(http.StatusServiceUnavailable)
×
219
                // Return the error to the caller in the body.
×
220
                _, err = fmt.Fprint(w, err.Error())
×
221
                if err != nil {
×
222
                        klog.Errorf("handleUploadRequest: failed to send error response: %v", err)
×
223
                }
×
224
                return
×
225
        }
226

227
        uploadPath, err := app.resolveUploadPath(pvc, tokenData.Name, r.URL.Path)
1✔
228
        if err != nil {
1✔
229
                klog.Error(err)
×
230
                w.WriteHeader(http.StatusServiceUnavailable)
×
231
                // Return the error to the caller in the body.
×
232
                _, err = fmt.Fprint(w, err.Error())
×
233
                if err != nil {
×
234
                        klog.Errorf("handleUploadRequest: failed to send error response: %v", err)
×
235
                }
×
236
                return
×
237
        }
238

239
        app.proxyUploadRequest(uploadPath, w, r)
1✔
240
}
241

242
func (app *uploadProxyApp) resolveUploadPath(pvc *v1.PersistentVolumeClaim, pvcName, defaultPath string) (string, error) {
1✔
243
        var path string
1✔
244
        contentType := pvc.Annotations[cc.AnnContentType]
1✔
245
        switch contentType {
1✔
246
        case string(cdiv1.DataVolumeKubeVirt), "":
1✔
247
                path = defaultPath
1✔
248
        case string(cdiv1.DataVolumeArchive):
×
249
                if strings.Contains(defaultPath, "alpha") {
×
250
                        path = common.UploadArchiveAlphaPath
×
251
                } else {
×
252
                        path = common.UploadArchivePath
×
253
                }
×
254
        default:
×
255
                // Escaping user-controlled strings to avoid cross-site scripting (XSS) attacks
×
256
                return "", fmt.Errorf("rejecting upload request for PVC %s - upload content-type %s is invalid", html.EscapeString(pvcName), html.EscapeString(contentType))
×
257
        }
258

259
        return app.urlResolver(pvc.Namespace, pvc.Name, path), nil
1✔
260
}
261

262
func (app *uploadProxyApp) uploadReady(pvcName, pvcNamespace string) (*v1.PersistentVolumeClaim, error) {
1✔
263
        var pvc *v1.PersistentVolumeClaim
1✔
264
        err := wait.PollUntilContextTimeout(context.TODO(), waitReadyImterval, waitReadyTime, true, func(ctx context.Context) (bool, error) {
2✔
265
                var err error
1✔
266
                pvc, err = app.client.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcName, metav1.GetOptions{})
1✔
267
                if err != nil {
1✔
268
                        if k8serrors.IsNotFound(err) {
×
269
                                return false, fmt.Errorf("rejecting Upload Request for PVC %s that doesn't exist", pvcName)
×
270
                        }
×
271

272
                        return false, err
×
273
                }
274
                // If using upload populator then need to check upload possibility to the PVC'
275
                if populators.IsPVCDataSourceRefKind(pvc, cdiv1.VolumeUploadSourceRef) {
1✔
276
                        pvc, err = app.getPopulationPVC(ctx, pvc, pvcNamespace)
×
277
                        if pvc == nil || err != nil {
×
278
                                return false, err
×
279
                        }
×
280
                }
281

282
                err = app.uploadPossible(pvc)
1✔
283
                if err != nil {
1✔
284
                        return false, err
×
285
                }
×
286
                phase := v1.PodPhase(pvc.Annotations[cc.AnnPodPhase])
1✔
287
                if phase == v1.PodSucceeded {
1✔
288
                        return false, fmt.Errorf("rejecting Upload Request for PVC %s that already finished uploading", pvcName)
×
289
                }
×
290

291
                ready, _ := strconv.ParseBool(pvc.Annotations[cc.AnnPodReady])
1✔
292
                return ready, nil
1✔
293
        })
294

295
        return pvc, err
1✔
296
}
297

298
func (app *uploadProxyApp) proxyUploadRequest(uploadPath string, w http.ResponseWriter, r *http.Request) {
1✔
299
        client, err := app.clientCreator.CreateClient()
1✔
300
        if err != nil {
1✔
301
                klog.Error("Error creating http client")
×
302
                w.WriteHeader(http.StatusInternalServerError)
×
303
                return
×
304
        }
×
305

306
        var buff bytes.Buffer
1✔
307
        p := &httputil.ReverseProxy{
1✔
308
                Director: func(req *http.Request) {
2✔
309
                        req.URL, _ = url.Parse(uploadPath)
1✔
310
                        if _, ok := req.Header["User-Agent"]; !ok {
2✔
311
                                // explicitly disable User-Agent so it's not set to default value
1✔
312
                                req.Header.Set("User-Agent", "")
1✔
313
                        }
1✔
314
                },
315
                Transport: client.Transport,
316
                ErrorLog:  log.New(&buff, "", 0),
317
        }
318

319
        p.ServeHTTP(w, r)
1✔
320

1✔
321
        if buff.Len() > 0 {
2✔
322
                msg := buff.String()
1✔
323
                klog.Errorf("Error in reverse proxy: %s", msg)
1✔
324
                fmt.Fprintf(w, "error in upload-proxy: %s", msg)
1✔
325
        }
1✔
326
}
327

328
func (app *uploadProxyApp) getSigningKey(publicKeyPEM string) error {
1✔
329
        publicKey, err := controller.DecodePublicKey([]byte(publicKeyPEM))
1✔
330
        if err != nil {
1✔
331
                return err
×
332
        }
×
333

334
        app.tokenValidator = token.NewValidator(common.UploadTokenIssuer, publicKey, uploadTokenLeeway)
1✔
335
        return nil
1✔
336
}
337

338
func (app *uploadProxyApp) Start() error {
×
339
        return app.startTLS()
×
340
}
×
341

342
func (app *uploadProxyApp) getTLSConfig() *tls.Config {
×
343
        cryptoConfig := app.cdiConfigTLSWatcher.GetCdiTLSConfig()
×
344

×
345
        //nolint:gosec // False positive (MinVersion unknown at build time)
×
346
        tlsConfig := &tls.Config{
×
347
                GetCertificate: app.certWatcher.GetCertificate,
×
348
                CipherSuites:   cryptoConfig.CipherSuites,
×
349
                MinVersion:     cryptoConfig.MinVersion,
×
350
        }
×
351

×
352
        return tlsConfig
×
353
}
×
354

355
func (app *uploadProxyApp) startTLS() error {
×
356
        var serveFunc func() error
×
357
        bindAddr := fmt.Sprintf("%s:%d", app.bindAddress, app.bindPort)
×
358

×
359
        server := &http.Server{
×
360
                Addr:              bindAddr,
×
361
                Handler:           app,
×
362
                ReadHeaderTimeout: 10 * time.Second,
×
363
        }
×
364

×
365
        if app.certWatcher != nil {
×
366
                tlsConfig := app.getTLSConfig()
×
367
                tlsConfig.GetConfigForClient = func(_ *tls.ClientHelloInfo) (*tls.Config, error) {
×
368
                        klog.V(3).Info("Getting TLS config")
×
369
                        config := app.getTLSConfig()
×
370
                        return config, nil
×
371
                }
×
372
                server.TLSConfig = tlsConfig
×
373

×
374
                serveFunc = func() error {
×
375
                        return server.ListenAndServeTLS("", "")
×
376
                }
×
377
        } else {
×
378
                serveFunc = func() error {
×
379
                        return server.ListenAndServe()
×
380
                }
×
381
        }
382

383
        errChan := make(chan error)
×
384

×
385
        go func() {
×
386
                errChan <- serveFunc()
×
387
        }()
×
388

389
        // wait for server to exit
390
        return <-errChan
×
391
}
392

393
func (app *uploadProxyApp) getPopulationPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim, pvcNamespace string) (*v1.PersistentVolumeClaim, error) {
×
394
        pvcPrimeName, ok := pvc.Annotations[populators.AnnPVCPrimeName]
×
395
        if !ok {
×
396
                // wait for pvcPrimeName annotation on the pvc
×
397
                return nil, nil
×
398
        }
×
399
        pvcPrime, err := app.client.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcPrimeName, metav1.GetOptions{})
×
400
        if err != nil {
×
401
                if k8serrors.IsNotFound(err) {
×
402
                        return nil, fmt.Errorf("rejecting Upload Request for PVC %s, PVC' wasn't created yet", pvc.Name)
×
403
                }
×
404

405
                return nil, err
×
406
        }
407

408
        return pvcPrime, nil
×
409
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc