• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tensorchord / openmodelz / 6689709675

30 Oct 2023 07:40AM UTC coverage: 24.968% (-1.0%) from 25.971%
6689709675

Pull #194

github

xieydd
Fix ci error

Signed-off-by: xieydd <xieydd@gmail.com>
Pull Request #194: feat: Support volume

321 of 321 new or added lines in 8 files covered. (100.0%)

969 of 3881 relevant lines covered (24.97%)

1.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.39
/modelzetes/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "strings"
7
        "time"
8

9
        appsv1 "k8s.io/api/apps/v1"
10
        corev1 "k8s.io/api/core/v1"
11
        "k8s.io/apimachinery/pkg/api/errors"
12
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
        "k8s.io/apimachinery/pkg/util/runtime"
14
        "k8s.io/apimachinery/pkg/util/wait"
15
        kubeinformers "k8s.io/client-go/informers"
16
        "k8s.io/client-go/kubernetes"
17
        "k8s.io/client-go/kubernetes/scheme"
18
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
19
        appslisters "k8s.io/client-go/listers/apps/v1"
20
        "k8s.io/client-go/tools/cache"
21
        "k8s.io/client-go/tools/record"
22
        "k8s.io/client-go/util/workqueue"
23
        glog "k8s.io/klog"
24

25
        v2alpha1 "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1"
26
        clientset "github.com/tensorchord/openmodelz/modelzetes/pkg/client/clientset/versioned"
27
        faasscheme "github.com/tensorchord/openmodelz/modelzetes/pkg/client/clientset/versioned/scheme"
28
        informers "github.com/tensorchord/openmodelz/modelzetes/pkg/client/informers/externalversions"
29
        listers "github.com/tensorchord/openmodelz/modelzetes/pkg/client/listers/modelzetes/v2alpha1"
30
        "github.com/tensorchord/openmodelz/modelzetes/pkg/consts"
31
)
32

33
const (
34
        controllerAgentName = "modelz-operator"
35
        functionPort        = 8080
36
        // SuccessSynced is used as part of the Event 'reason' when a Function is synced
37
        SuccessSynced = "Synced"
38
        // ErrResourceExists is used as part of the Event 'reason' when a Function fails
39
        // to sync due to a Deployment of the same name already existing.
40
        ErrResourceExists = "ErrResourceExists"
41

42
        // MessageResourceExists is the message used for Events when a resource
43
        // fails to sync due to a Deployment already existing
44
        MessageResourceExists = "Resource %q already exists and is not managed by OpenFaaS"
45
        // MessageResourceSynced is the message used for an Event fired when a Function
46
        // is synced successfully
47
        MessageResourceSynced = "Function synced successfully"
48
)
49

50
// Controller is the controller implementation for Function resources
51
type Controller struct {
52
        BaseDomain string
53
        // kubeclientset is a standard kubernetes clientset
54
        kubeclientset kubernetes.Interface
55
        // faasclientset is a clientset for our own API group
56
        faasclientset clientset.Interface
57

58
        deploymentsLister appslisters.DeploymentLister
59
        deploymentsSynced cache.InformerSynced
60
        inferenceLister   listers.InferenceLister
61
        inferencesSynced  cache.InformerSynced
62

63
        // workqueue is a rate limited work queue. This is used to queue work to be
64
        // processed instead of performing it as soon as a change happens. This
65
        // means we can ensure we only process a fixed amount of resources at a
66
        // time, and makes it easy to ensure we are never processing the same item
67
        // simultaneously in two different workers.
68
        workqueue workqueue.RateLimitingInterface
69
        // recorder is an event recorder for recording Event resources to the
70
        // Kubernetes API.
71
        recorder record.EventRecorder
72

73
        // OpenFaaS function factory
74
        factory FunctionFactory
75
}
76

77
// NewController returns a new OpenFaaS controller
78
func NewController(
79
        kubeclientset kubernetes.Interface,
80
        inferenceclientset clientset.Interface,
81
        kubeInformerFactory kubeinformers.SharedInformerFactory,
82
        inferenceInformerFactory informers.SharedInformerFactory,
83
        factory FunctionFactory) *Controller {
×
84

×
85
        // obtain references to shared index informers for the Deployment and Function types
×
86
        deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
×
87
        inferenceInformer := inferenceInformerFactory.Tensorchord().V2alpha1().Inferences()
×
88

×
89
        // Create event broadcaster
×
90
        // Add o6s types to the default Kubernetes Scheme so Events can be
×
91
        // logged for faas-controller types.
×
92
        faasscheme.AddToScheme(scheme.Scheme)
×
93
        glog.V(4).Info("Creating event broadcaster")
×
94
        eventBroadcaster := record.NewBroadcaster()
×
95
        eventBroadcaster.StartLogging(glog.V(4).Infof)
×
96
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
×
97
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
×
98

×
99
        controller := &Controller{
×
100
                kubeclientset:     kubeclientset,
×
101
                faasclientset:     inferenceclientset,
×
102
                deploymentsLister: deploymentInformer.Lister(),
×
103
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
×
104
                inferenceLister:   inferenceInformer.Lister(),
×
105
                inferencesSynced:  inferenceInformer.Informer().HasSynced,
×
106
                workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Functions"),
×
107
                recorder:          recorder,
×
108
                factory:           factory,
×
109
        }
×
110

×
111
        glog.Info("Setting up event handlers")
×
112

×
113
        //  Add Function (OpenFaaS CRD-entry) Informer
×
114
        //
×
115
        // Set up an event handler for when Function resources change
×
116
        inferenceInformer.Informer().
×
117
                AddEventHandler(cache.ResourceEventHandlerFuncs{
×
118
                        AddFunc: controller.enqueueFunction,
×
119
                        UpdateFunc: func(old, new interface{}) {
×
120
                                controller.enqueueFunction(new)
×
121
                        },
×
122
                })
123

124
        // Set up an event handler for when functions related resources like pods, deployments, replica sets
125
        // can't be materialized. This logs abnormal events like ImagePullBackOff, back-off restarting failed container,
126
        // failed to start container, oci runtime errors, etc
127
        // Enable this with -v=3
128
        kubeInformerFactory.Core().V1().Events().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
129
                AddFunc: func(obj interface{}) {
×
130
                        key, err := cache.MetaNamespaceKeyFunc(obj)
×
131
                        if err == nil {
×
132
                                event := obj.(*corev1.Event)
×
133
                                since := time.Since(event.LastTimestamp.Time)
×
134
                                // log abnormal events occurred in the last minute
×
135
                                if since.Seconds() < 61 && strings.Contains(event.Type, "Warning") {
×
136
                                        glog.V(3).Infof("Abnormal event detected on %s %s: %s", event.LastTimestamp, key, event.Message)
×
137
                                }
×
138
                        }
139
                },
140
        })
141

142
        return controller
×
143
}
144

145
// Run will set up the event handlers for types we are interested in, as well
146
// as syncing informer caches and starting workers. It will block until stopCh
147
// is closed, at which point it will shutdown the workqueue and wait for
148
// workers to finish processing their current work items.
149
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
×
150
        defer runtime.HandleCrash()
×
151
        defer c.workqueue.ShutDown()
×
152

×
153
        // Start the informer factories to begin populating the informer caches
×
154
        // Wait for the caches to be synced before starting workers
×
155
        glog.Info("Waiting for informer caches to sync")
×
156
        if ok := cache.WaitForCacheSync(stopCh,
×
157
                c.deploymentsSynced, c.inferencesSynced); !ok {
×
158
                return fmt.Errorf("failed to wait for caches to sync")
×
159
        }
×
160

161
        glog.Info("Starting workers")
×
162
        // Launch two workers to process Function resources
×
163
        for i := 0; i < threadiness; i++ {
×
164
                go wait.Until(c.runWorker, time.Second, stopCh)
×
165
        }
×
166

167
        glog.Info("Started workers")
×
168
        <-stopCh
×
169
        glog.Info("Shutting down workers")
×
170

×
171
        return nil
×
172
}
173

174
// runWorker is a long-running function that will continually call the
175
// processNextWorkItem function in order to read and process a message on the workqueue.
176
func (c *Controller) runWorker() {
×
177
        for c.processNextWorkItem() {
×
178
        }
×
179
}
180

181
// processNextWorkItem will read a single work item off the workqueue and
182
// attempt to process it, by calling the syncHandler.
183
func (c *Controller) processNextWorkItem() bool {
×
184
        obj, shutdown := c.workqueue.Get()
×
185

×
186
        if shutdown {
×
187
                return false
×
188
        }
×
189

190
        err := func(obj interface{}) error {
×
191
                defer c.workqueue.Done(obj)
×
192
                var key string
×
193
                var ok bool
×
194
                if key, ok = obj.(string); !ok {
×
195
                        c.workqueue.Forget(obj)
×
196
                        runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
×
197
                        return nil
×
198
                }
×
199
                if err := c.syncHandler(key); err != nil {
×
200
                        return fmt.Errorf("error syncing '%s': %s", key, err.Error())
×
201
                }
×
202
                c.workqueue.Forget(obj)
×
203
                return nil
×
204
        }(obj)
205

206
        if err != nil {
×
207
                runtime.HandleError(err)
×
208
                return true
×
209
        }
×
210

211
        return true
×
212
}
213

214
// syncHandler compares the actual state with the desired, and attempts to
215
// converge the two.
216
func (c *Controller) syncHandler(key string) error {
×
217
        // Convert the namespace/name string into a distinct namespace and name
×
218
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
219
        if err != nil {
×
220
                runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
221
                return nil
×
222
        }
×
223

224
        // Get the Function resource with this namespace/name
225
        function, err := c.inferenceLister.Inferences(namespace).Get(name)
×
226
        if err != nil {
×
227
                // The Function resource may no longer exist, in which case we stop processing.
×
228
                if errors.IsNotFound(err) {
×
229
                        runtime.HandleError(fmt.Errorf("function '%s' in work queue no longer exists", key))
×
230
                        return nil
×
231
                }
×
232

233
                return err
×
234
        }
235

236
        deploymentName := function.Spec.Name
×
237
        if deploymentName == "" {
×
238
                // We choose to absorb the error here as the worker would requeue the
×
239
                // resource otherwise. Instead, the next time the resource is updated
×
240
                // the resource will be queued again.
×
241
                runtime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
×
242
                return nil
×
243
        }
×
244

245
        if function.Spec.Annotations != nil {
×
246
                if _, ok := function.Spec.Annotations[consts.AnnotationBuilding]; ok {
×
247
                        glog.Infof("Function '%s' is still building", function.Spec.Name)
×
248
                        return nil
×
249
                }
×
250
        }
251

252
        // Create persistentvolume if needed.
253
        if len(function.Spec.Volumes) != 0 {
×
254
                for _, volume := range function.Spec.Volumes {
×
255

×
256
                        if (volume.Type == v2alpha1.VolumeTypeLocal) && (len(volume.NodeNames) == 0) {
×
257
                                // no need create pv and pvc for hostPath
×
258
                                continue
×
259
                        }
260
                        pvName := makePersistentVolumeName(volume.Name)
×
261
                        _, err := c.kubeclientset.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
×
262
                        if errors.IsNotFound(err) {
×
263
                                err = nil
×
264
                                glog.Infof("Creating persistentvolume %s for '%s'", pvName, function.Spec.Name)
×
265
                                if _, err := c.kubeclientset.CoreV1().PersistentVolumes().Create(context.TODO(),
×
266
                                        newPersistentVolume(function, volume), metav1.CreateOptions{}); err != nil {
×
267
                                        if errors.IsAlreadyExists(err) {
×
268
                                                err = nil
×
269
                                                glog.V(2).Infof("Persistentvolume '%s' already exists. Skipping creation.", function.Spec.Name)
×
270
                                        } else {
×
271
                                                return err
×
272
                                        }
×
273
                                }
274
                        }
275

276
                        pvcName := makePersistentVolumeClaimName(volume.Name)
×
277
                        _, err = c.kubeclientset.CoreV1().PersistentVolumeClaims(function.Namespace).Get(context.TODO(), pvcName, metav1.GetOptions{})
×
278
                        if errors.IsNotFound(err) {
×
279
                                err = nil
×
280
                                glog.Infof("Creating persistentvolumeclaim %s for '%s'", pvcName, function.Spec.Name)
×
281
                                if _, err := c.kubeclientset.CoreV1().PersistentVolumeClaims(function.Namespace).Create(context.TODO(),
×
282
                                        makePersistentVolumeClaim(function, volume), metav1.CreateOptions{}); err != nil {
×
283
                                        if errors.IsAlreadyExists(err) {
×
284
                                                err = nil
×
285
                                                glog.V(2).Infof("Persistentvolumeclaim '%s' already exists. Skipping creation.", function.Spec.Name)
×
286
                                        } else {
×
287
                                                return err
×
288
                                        }
×
289
                                }
290
                        }
291
                }
292
        }
293
        // Create persistentvolumeclaim if needed.
294

295
        // Get the deployment with the name specified in Function.spec
296
        deployment, err := c.deploymentsLister.
×
297
                Deployments(function.Namespace).Get(deploymentName)
×
298
        // If the resource doesn't exist, we'll create it
×
299
        if errors.IsNotFound(err) {
×
300
                err = nil
×
301
                existingSecrets, err := c.getSecrets(function.Namespace, function.Spec.Secrets)
×
302
                if err != nil {
×
303
                        return err
×
304
                }
×
305

306
                glog.Infof("Creating deployment for '%s'", function.Spec.Name)
×
307
                deployment, err = c.kubeclientset.AppsV1().Deployments(function.Namespace).Create(
×
308
                        context.TODO(),
×
309
                        newDeployment(function, deployment, existingSecrets, c.factory),
×
310
                        metav1.CreateOptions{},
×
311
                )
×
312
                if err != nil {
×
313
                        return err
×
314
                }
×
315
        }
316

317
        svcGetOptions := metav1.GetOptions{}
×
318
        svcName := consts.DefaultServicePrefix + deploymentName
×
319
        _, getSvcErr := c.kubeclientset.CoreV1().Services(function.Namespace).Get(context.TODO(), deploymentName, svcGetOptions)
×
320
        if errors.IsNotFound(getSvcErr) {
×
321
                glog.Infof("Creating ClusterIP service for '%s'", function.Spec.Name)
×
322
                if _, err := c.kubeclientset.CoreV1().Services(function.Namespace).Create(context.TODO(), newService(function), metav1.CreateOptions{}); err != nil {
×
323
                        // If an error occurs during Service Create, we'll requeue the item
×
324
                        if errors.IsAlreadyExists(err) {
×
325
                                err = nil
×
326
                                glog.V(2).Infof("ClusterIP service '%s' already exists. Skipping creation.", function.Spec.Name)
×
327
                        } else {
×
328
                                return err
×
329
                        }
×
330
                }
331
        }
332

333
        // If an error occurs during Get/Create, we'll requeue the item so we can
334
        // attempt processing again later. This could have been caused by a
335
        // temporary network failure, or any other transient reason.
336
        if err != nil {
×
337
                return fmt.Errorf("transient error: %v", err)
×
338
        }
×
339

340
        // If the Deployment is not controlled by this Function resource, we should log
341
        // a warning to the event recorder and ret
342
        if !metav1.IsControlledBy(deployment, function) {
×
343
                msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
×
344
                c.recorder.Event(function, corev1.EventTypeWarning, ErrResourceExists, msg)
×
345
                return fmt.Errorf(msg)
×
346
        }
×
347

348
        // Update the Deployment resource if the Function definition differs
349
        if deploymentNeedsUpdate(function, deployment) {
×
350
                glog.Infof("Updating deployment for '%s'", function.Spec.Name)
×
351

×
352
                existingSecrets, err := c.getSecrets(function.Namespace, function.Spec.Secrets)
×
353
                if err != nil {
×
354
                        return err
×
355
                }
×
356

357
                deployment, err = c.kubeclientset.AppsV1().Deployments(function.Namespace).Update(
×
358
                        context.TODO(),
×
359
                        newDeployment(function, deployment, existingSecrets, c.factory),
×
360
                        metav1.UpdateOptions{},
×
361
                )
×
362
                if err != nil {
×
363
                        glog.Errorf("Updating deployment for '%s' failed: %v", function.Spec.Name, err)
×
364
                }
×
365

366
                existingService, err := c.kubeclientset.CoreV1().Services(function.Namespace).Get(context.TODO(), svcName, metav1.GetOptions{})
×
367
                if err != nil {
×
368
                        return err
×
369
                }
×
370

371
                existingService.Annotations = makeAnnotations(function)
×
372
                _, err = c.kubeclientset.CoreV1().Services(function.Namespace).Update(context.TODO(), existingService, metav1.UpdateOptions{})
×
373
                if err != nil {
×
374
                        glog.Errorf("Updating service for '%s' failed: %v", function.Spec.Name, err)
×
375
                }
×
376
        }
377

378
        // If an error occurs during Update, we'll requeue the item so we can
379
        // attempt processing again later. THis could have been caused by a
380
        // temporary network failure, or any other transient reason.
381
        if err != nil {
×
382
                return err
×
383
        }
×
384

385
        c.recorder.Event(function, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
×
386
        return nil
×
387
}
388

389
// enqueueFunction takes a Function resource and converts it into a namespace/name
390
// string which is then put onto the work queue. This method should *not* be
391
// passed resources of any type other than Function.
392
func (c *Controller) enqueueFunction(obj interface{}) {
×
393
        var key string
×
394
        var err error
×
395
        if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
×
396
                runtime.HandleError(err)
×
397
                return
×
398
        }
×
399
        c.workqueue.AddRateLimited(key)
×
400
}
401

402
// handleObject will take any resource implementing metav1.Object and attempt
403
// to find the Function resource that 'owns' it. It does this by looking at the
404
// objects metadata.ownerReferences field for an appropriate OwnerReference.
405
// It then enqueues that Function resource to be processed. If the object does not
406
// have an appropriate OwnerReference, it will simply be skipped.
407
func (c *Controller) handleObject(obj interface{}) {
×
408
        var object metav1.Object
×
409
        var ok bool
×
410
        if object, ok = obj.(metav1.Object); !ok {
×
411
                tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
×
412
                if !ok {
×
413
                        runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
×
414
                        return
×
415
                }
×
416
                object, ok = tombstone.Obj.(metav1.Object)
×
417
                if !ok {
×
418
                        runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
×
419
                        return
×
420
                }
×
421
                glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
×
422
        }
423
        glog.V(4).Infof("Processing object: %s", object.GetName())
×
424
        if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
×
425
                // If this object is not owned by a function, we should not do anything more
×
426
                // with it.
×
427
                if ownerRef.Kind != v2alpha1.Kind {
×
428
                        return
×
429
                }
×
430

431
                function, err := c.inferenceLister.Inferences(
×
432
                        object.GetNamespace()).Get(ownerRef.Name)
×
433
                if err != nil {
×
434
                        glog.Infof("Function '%s' deleted. Ignoring orphaned object '%s'", ownerRef.Name, object.GetSelfLink())
×
435
                        return
×
436
                }
×
437

438
                c.enqueueFunction(function)
×
439
                return
×
440
        }
441
}
442

443
// getSecrets queries Kubernetes for a list of secrets by name in the given k8s namespace.
444
func (c *Controller) getSecrets(namespace string,
445
        secretNames []string) (map[string]*corev1.Secret, error) {
×
446
        secrets := map[string]*corev1.Secret{}
×
447

×
448
        for _, secretName := range secretNames {
×
449
                secret, err := c.kubeclientset.CoreV1().
×
450
                        Secrets(namespace).Get(context.TODO(), secretName, metav1.GetOptions{})
×
451
                if err != nil {
×
452
                        return secrets, err
×
453
                }
×
454
                secrets[secretName] = secret
×
455
        }
456

457
        return secrets, nil
×
458
}
459

460
// getReplicas returns the desired number of replicas for a function taking into account
461
// the min replicas label, HPA, the autoscaler and scaled to zero deployments
462
func getReplicas(inference *v2alpha1.Inference, deployment *appsv1.Deployment) *int32 {
16✔
463
        var minReplicas, maxReplicas *int32
16✔
464
        if inference.Spec.Scaling != nil {
22✔
465
                minReplicas = inference.Spec.Scaling.MinReplicas
6✔
466
                maxReplicas = inference.Spec.Scaling.MaxReplicas
6✔
467
        }
6✔
468

469
        // extract current deployment replicas if specified
470
        var deploymentReplicas *int32
16✔
471
        if deployment != nil {
22✔
472
                deploymentReplicas = deployment.Spec.Replicas
6✔
473
        }
6✔
474

475
        // do not set replicas if min replicas is not set
476
        // and current deployment has no replicas count
477
        if minReplicas == nil && deploymentReplicas == nil {
27✔
478
                return nil
11✔
479
        }
11✔
480

481
        // set replicas to min if deployment has no replicas and min replicas exists
482
        if minReplicas != nil && deploymentReplicas == nil {
6✔
483
                return minReplicas
1✔
484
        }
1✔
485

486
        // do not override replicas when min is not specified
487
        if minReplicas == nil && deploymentReplicas != nil {
5✔
488
                return deploymentReplicas
1✔
489
        }
1✔
490

491
        if minReplicas != nil && deploymentReplicas != nil {
6✔
492
                if maxReplicas == nil {
6✔
493
                        // do not override HPA or OF autoscaler replicas if the value is greater than min
3✔
494
                        if *deploymentReplicas >= *minReplicas {
4✔
495
                                return deploymentReplicas
1✔
496
                        }
1✔
497
                } else {
×
498
                        // do not override HPA or OF autoscaler replicas if the value is between min and max
×
499
                        if *deploymentReplicas >= *minReplicas &&
×
500
                                *deploymentReplicas <= *maxReplicas {
×
501
                                return deploymentReplicas
×
502
                        } else if *deploymentReplicas > *maxReplicas {
×
503
                                return maxReplicas
×
504
                        }
×
505
                }
506
        }
507

508
        return minReplicas
2✔
509
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc